From c582b07df30e0bd4612077ebaa6e51a556f97c3c Mon Sep 17 00:00:00 2001 From: James Baiera Date: Mon, 24 Feb 2020 13:49:52 -0500 Subject: [PATCH 1/4] Update the Kerberos QA project to use the new test cluster api. Also update the project to run fixtures on the java 8 runtime. Java home can now be set on cluster configs and is used by fixtures. A cluster can be configured with a dependent ElasticsearchCluster. This cluster is depended on for any configuration tasks in case its cluster address is used. Settings are now stored as objects in cluster configurations instead of Strings since they may be GStrings with closures inside of them that must be resolved ONLY at configuration writing time. This is needed to resolve test cluster http addresses which are only discoverable once the cluster is started. In order to make this resolution process simpler, removed most of the Map types in the code and replaced them with the FileSettings type which can resolve closures as needed when writing the configuration to a file. Upgraded Spark QA runtime to 2.3.4 as the older version has been removed from apache's mirrors. AbstractClusterTask now extends DefaultTestClustersTask, as it is the only way to ensure that Elasticsearch is still running when starting the task. In order to configure JAVA_HOME consistently, each AbstractClusterTask now has its environment variables resolved in its super class instead of with duplicated code in each task. --- .../fixture/hadoop/ConfigFormats.groovy | 14 ++-- .../hadoop/HadoopClusterFormationTasks.groovy | 77 ++++++++++++----- .../gradle/fixture/hadoop/InstanceInfo.groovy | 26 +++--- .../fixture/hadoop/ServiceDescriptor.groovy | 6 +- .../conf/EndProcessConfiguration.groovy | 17 +++- .../conf/HadoopClusterConfiguration.groovy | 6 +- .../hadoop/conf/ProcessConfiguration.groovy | 41 ++++++++- .../hadoop/conf/SettingsContainer.groovy | 47 ++++++++--- .../services/HadoopServiceDescriptor.groovy | 46 +++++----- .../services/HiveServiceDescriptor.groovy | 8 +- .../services/PigServiceDescriptor.groovy | 6 +- .../SparkYarnServiceDescriptor.groovy | 13 +-- .../hadoop/tasks/AbstractClusterTask.groovy | 35 +++++++- .../fixture/hadoop/tasks/DfsCopy.groovy | 31 +++---- .../fixture/hadoop/tasks/HadoopMRJob.groovy | 83 ++++++++++++------- .../fixture/hadoop/tasks/HiveBeeline.groovy | 32 ++++--- .../fixture/hadoop/tasks/PigScript.groovy | 27 ++++-- .../fixture/hadoop/tasks/SparkApp.groovy | 37 ++++++--- .../hadoop/gradle/util/ObjectUtil.groovy | 32 +++++++ qa/kerberos/build.gradle | 73 +++++++++------- .../qa/kerberos/setup/SetupKerberosUsers.java | 2 - .../qa/kerberos/storm/StreamFromEs.java | 7 +- .../hadoop/qa/kerberos/storm/StreamToEs.java | 7 +- .../src/main/resources/hive/load_to_es.sql | 1 - .../src/main/resources/hive/read_from_es.sql | 2 +- .../src/main/resources/pig/load_to_es.pig | 1 - .../src/main/resources/pig/read_from_es.pig | 1 - test/fixtures/minikdc/build.gradle | 4 + 28 files changed, 466 insertions(+), 216 deletions(-) create mode 100644 buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/util/ObjectUtil.groovy diff --git a/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/fixture/hadoop/ConfigFormats.groovy b/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/fixture/hadoop/ConfigFormats.groovy index 84e7519c7..8cd6f3b91 100644 --- a/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/fixture/hadoop/ConfigFormats.groovy +++ b/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/fixture/hadoop/ConfigFormats.groovy @@ -19,11 +19,13 @@ package org.elasticsearch.hadoop.gradle.fixture.hadoop +import static org.elasticsearch.hadoop.gradle.fixture.hadoop.conf.SettingsContainer.FileSettings + class ConfigFormats { static Closure hadoopXML() { - return { Map conf -> - String props = conf.collect { key, value -> + return { FileSettings conf -> + String props = conf.resolve().collect { key, value -> "\n\t\t${key}\n\t\t${value}\n\t" }.join("\n\t") return "\n\t${props}\n" @@ -31,16 +33,16 @@ class ConfigFormats { } static Closure propertyFile() { - return { Map conf -> - conf.collect { key, value -> + return { FileSettings conf -> + conf.resolve().collect { key, value -> "${key}=${value}" }.join("\n") } } static Closure whiteSpaced() { - return { Map conf -> - conf.collect { key, value -> + return { FileSettings conf -> + conf.resolve().collect { key, value -> "${key} ${value}" }.join("\n") } diff --git a/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/fixture/hadoop/HadoopClusterFormationTasks.groovy b/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/fixture/hadoop/HadoopClusterFormationTasks.groovy index fe6548d1a..9557d03a2 100644 --- a/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/fixture/hadoop/HadoopClusterFormationTasks.groovy +++ b/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/fixture/hadoop/HadoopClusterFormationTasks.groovy @@ -23,6 +23,7 @@ import org.apache.tools.ant.DefaultLogger import org.elasticsearch.gradle.LoggedExec import org.elasticsearch.gradle.Version import org.elasticsearch.gradle.test.Fixture +import org.elasticsearch.gradle.testclusters.DefaultTestClustersTask import org.elasticsearch.hadoop.gradle.fixture.hadoop.conf.HadoopClusterConfiguration import org.elasticsearch.hadoop.gradle.fixture.hadoop.conf.InstanceConfiguration import org.elasticsearch.hadoop.gradle.fixture.hadoop.conf.RoleConfiguration @@ -42,6 +43,8 @@ import org.apache.tools.ant.taskdefs.condition.Os import java.nio.file.Paths +import static org.elasticsearch.hadoop.gradle.fixture.hadoop.conf.SettingsContainer.FileSettings + /** * A helper for creating tasks to build a cluster that is used by a task, and tear down the cluster * when the task is finished. @@ -49,11 +52,12 @@ import java.nio.file.Paths class HadoopClusterFormationTasks { /** - * A start and stop task for a fixture + * A start and stop task for an instance, as well as any and all instance specific setup and teardown tasks */ - static class TaskPair { + static class InstanceTasks { Task startTask Task stopTask + List allTasks } /** @@ -99,20 +103,20 @@ class HadoopClusterFormationTasks { List nodes = [] // Create the fixtures for each service - List clusterTaskPairs = [] + List clusterTaskPairs = [] for (ServiceConfiguration serviceConfiguration : clusterConfiguration.getServices()) { // Get the download task for this service's package and add it to the service's dependency tasks DistributionTasks distributionTasks = getOrConfigureDistributionDownload(project, serviceConfiguration) // Keep track of the start tasks in this service - List serviceTaskPairs = [] + List serviceTaskPairs = [] // Create fixtures for each role in the service for (RoleConfiguration roleConfiguration : serviceConfiguration.getRoles()) { // Keep track of the start tasks in this role - List roleTaskPairs = [] + List roleTaskPairs = [] // Create fixtures for each instance in the role for (InstanceConfiguration instanceConfiguration : roleConfiguration.getInstances()) { @@ -136,7 +140,7 @@ class HadoopClusterFormationTasks { nodes.add(instanceInfo) // Create the tasks for the instance - TaskPair instanceTasks + InstanceTasks instanceTasks try { instanceTasks = configureNode(project, prefix, instanceDependencies, instanceInfo, distributionTasks) @@ -168,6 +172,16 @@ class HadoopClusterFormationTasks { } } } + + // Check to see if any of the instance tasks are test cluster aware, and if they are, set the + // es cluster to be whichever cluster was configured, if any + if (instanceInfo.elasticsearchCluster != null) { + for (Task instanceTask : instanceTasks.allTasks) { + if (instanceTask instanceof DefaultTestClustersTask) { + ((DefaultTestClustersTask) instanceTask).useCluster(instanceInfo.elasticsearchCluster) + } + } + } } // Make each task in the role depend on and also be finalized by each instance in the service. List startTasks = roleTaskPairs.collect{it.startTask} @@ -234,31 +248,44 @@ class HadoopClusterFormationTasks { return new DistributionTasks(download: downloadTask, verify: verifyTask) } - static TaskPair configureNode(Project project, String prefix, Object dependsOn, InstanceInfo node, - DistributionTasks distribution) { - Task setup = project.tasks.create(name: taskName(prefix, node, 'clean'), type: Delete, dependsOn: dependsOn) { + static InstanceTasks configureNode(Project project, String prefix, Object dependsOn, InstanceInfo node, + DistributionTasks distribution) { + List instanceTasks = [] + + Task clean = project.tasks.create(name: taskName(prefix, node, 'clean'), type: Delete, dependsOn: dependsOn) { delete node.homeDir delete node.cwd group = 'hadoopFixture' } + instanceTasks.add(clean) // Only create CWD and check previous if the role is an executable process + Task lastInitTask = clean if (node.getConfig().getRoleDescriptor().isExecutableProcess()) { - setup = project.tasks.create(name: taskName(prefix, node, 'createCwd'), type: DefaultTask, dependsOn: setup) { + Task createCwd = project.tasks.create(name: taskName(prefix, node, 'createCwd'), type: DefaultTask, dependsOn: clean) { doLast { node.cwd.mkdirs() } outputs.dir node.cwd group = 'hadoopFixture' } - setup = configureCheckPreviousTask(taskName(prefix, node, 'checkPrevious'), project, setup, node) - setup = configureStopTask(taskName(prefix, node, 'stopPrevious'), project, setup, node) + Task checkPrevious = configureCheckPreviousTask(taskName(prefix, node, 'checkPrevious'), project, createCwd, node) + Task stopPrevious = configureStopTask(taskName(prefix, node, 'stopPrevious'), project, checkPrevious, node) + lastInitTask = stopPrevious + + instanceTasks.add(createCwd) + instanceTasks.add(checkPrevious) + instanceTasks.add(stopPrevious) } // Always extract the package contents, and configure the files - setup = configureExtractTask(taskName(prefix, node, 'extract'), project, setup, node, distribution) - setup = configureWriteConfigTask(taskName(prefix, node, 'configure'), project, setup, node) - setup = configureExtraConfigFilesTask(taskName(prefix, node, 'extraConfig'), project, setup, node) + Task extract = configureExtractTask(taskName(prefix, node, 'extract'), project, lastInitTask, node, distribution) + Task configure = configureWriteConfigTask(taskName(prefix, node, 'configure'), project, extract, node) + Task extraConfig = configureExtraConfigFilesTask(taskName(prefix, node, 'extraConfig'), project, configure, node) + + instanceTasks.add(extract) + instanceTasks.add(configure) + instanceTasks.add(extraConfig) // If the role for this instance is not a process, we skip creating start and stop tasks for it. if (!node.getConfig().getRoleDescriptor().isExecutableProcess()) { @@ -267,15 +294,16 @@ class HadoopClusterFormationTasks { for (Object dependency : node.config.getDependencies()) { if (dependency instanceof Fixture) { def depStop = ((Fixture)dependency).stopTask - setup.finalizedBy(depStop) + extraConfig.finalizedBy(depStop) } } - return new TaskPair(startTask: setup) + return new InstanceTasks(startTask: extraConfig, allTasks: instanceTasks) } Map setupCommands = new LinkedHashMap<>() setupCommands.putAll(node.config.getServiceDescriptor().defaultSetupCommands(node.config)) setupCommands.putAll(node.config.getSetupCommands()) + Task lastSetupCommand = extraConfig for (Map.Entry command : setupCommands) { // the first argument is the actual script name, relative to home Object[] args = command.getValue().clone() @@ -295,17 +323,21 @@ class HadoopClusterFormationTasks { commandPath = node.homeDir.toPath().resolve(args[0].toString()).toString() } args[0] = commandPath - setup = configureExecTask(taskName(prefix, node, command.getKey()), project, setup, node, args) + lastSetupCommand = configureExecTask(taskName(prefix, node, command.getKey()), project, lastSetupCommand, node, args) + instanceTasks.add(lastSetupCommand) } // Configure daemon start task - Task start = configureStartTask(taskName(prefix, node, 'start'), project, setup, node) + Task start = configureStartTask(taskName(prefix, node, 'start'), project, lastSetupCommand, node) + instanceTasks.add(start) // Configure wait task Task wait = configureWaitTask(taskName(prefix, node, 'wait'), project, node, start, 30) + instanceTasks.add(wait) // Configure daemon stop task Task stop = configureStopTask(taskName(prefix, node, 'stop'), project, [], node) + instanceTasks.add(stop) // We're running in the background, so make sure that the stop command is called after all cluster tasks finish wait.finalizedBy(stop) @@ -319,7 +351,7 @@ class HadoopClusterFormationTasks { stop.finalizedBy(depStop) } } - return new TaskPair(startTask: wait, stopTask: stop) + return new InstanceTasks(startTask: wait, stopTask: stop, allTasks: instanceTasks) } static Task configureCheckPreviousTask(String name, Project project, Task setup, InstanceInfo node) { @@ -342,13 +374,13 @@ class HadoopClusterFormationTasks { static Task configureWriteConfigTask(String name, Project project, Task setup, InstanceInfo node) { // Add all node level configs to node Configuration - return project.tasks.create(name: name, type: DefaultTask, dependsOn: setup) { + return project.tasks.create(name: name, type: DefaultTestClustersTask, dependsOn: setup) { group = 'hadoopFixture' doFirst { // Write each config file needed node.configFiles.forEach { configFile -> String configName = configFile.getName() - Map configFileEntries = node.configContents.get(configName) + FileSettings configFileEntries = node.configContents.get(configName) if (configFileEntries == null) { throw new GradleException("Could not find contents of [${configFile}] settings file from deployment options.") } @@ -387,7 +419,6 @@ class HadoopClusterFormationTasks { return project.tasks.create(name: name, type: LoggedExec, dependsOn: setup) { Exec exec -> exec.group = 'hadoopFixture' exec.workingDir node.cwd - exec.environment 'JAVA_HOME', node.getJavaHome() exec.environment(node.env) // Configure HADOOP_OPTS (or similar env) - adds system properties, assertion flags, remote debug etc diff --git a/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/fixture/hadoop/InstanceInfo.groovy b/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/fixture/hadoop/InstanceInfo.groovy index 183aceeaf..92c39195b 100644 --- a/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/fixture/hadoop/InstanceInfo.groovy +++ b/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/fixture/hadoop/InstanceInfo.groovy @@ -20,7 +20,7 @@ package org.elasticsearch.hadoop.gradle.fixture.hadoop import org.apache.tools.ant.taskdefs.condition.Os -import org.elasticsearch.gradle.info.BuildParams +import org.elasticsearch.gradle.testclusters.ElasticsearchCluster import org.elasticsearch.hadoop.gradle.util.WaitForURL import org.elasticsearch.hadoop.gradle.fixture.hadoop.conf.InstanceConfiguration import org.gradle.api.GradleException @@ -30,6 +30,8 @@ import java.nio.file.Path import java.nio.file.Paths import java.util.concurrent.TimeUnit +import static org.elasticsearch.hadoop.gradle.fixture.hadoop.conf.SettingsContainer.FileSettings + /** * Generic information for any process running in a hadoop ecosystem. * @@ -70,7 +72,7 @@ class InstanceInfo { /** The config files */ List configFiles - Map> configContents + Map configContents /** Closure that renders the contents of the config file */ Closure configFileFormatter @@ -84,8 +86,8 @@ class InstanceInfo { /** stdout/stderr log of the service process for this instance */ File startLog - /** Major version of java this node runs with, or {@code null} if using the runtime java version */ - Integer javaVersion + /** Location of the java installation to use when running processes **/ + String javaHome /** environment variables to start the node with */ Map env @@ -108,6 +110,9 @@ class InstanceInfo { /** buffer for ant output when starting this node */ ByteArrayOutputStream buffer = new ByteArrayOutputStream() + /** Elasticsearch cluster dependency for tasks **/ + ElasticsearchCluster elasticsearchCluster + /** * A closure to call before the cluster is considered ready. The closure is passed the node info, * as well as a groovy AntBuilder, to enable running ant condition checks. The default wait @@ -155,13 +160,16 @@ class InstanceInfo { startLog = new File(cwd, 'run.log') // We just default to the current runtime at this time - javaVersion = 8 + javaHome = config.getJavaHome() // Prepare Environment env = [:] env.putAll(config.getEnvironmentVariables()) config.getServiceDescriptor().finalizeEnv(env, config) + // Add JAVA_HOME to the environment + env['JAVA_HOME'] = javaHome + // Prepare startup command and arguments args = [] List startCommandLine = config.getServiceDescriptor().startCommand(config) @@ -202,6 +210,8 @@ class InstanceInfo { if (Os.isFamily(Os.FAMILY_WINDOWS)) { args.add('"') // end the entire command, quoted } + + this.elasticsearchCluster = config.getElasticsearchCluster() } Path binPath() { @@ -235,11 +245,6 @@ class InstanceInfo { throw new UnsupportedOperationException("JNAKernal32Library is compiled for Java 10 and up.") } - /** Return the java home used by this node. */ - String getJavaHome() { - return javaVersion == null ? project.runtimeJavaHome : BuildParams.javaVersions.find { it.version == javaVersion }.javaHome.absolutePath - } - /** Returns debug string for the command that started this node. */ String getCommandString() { String commandString = "\nService ${config.serviceDescriptor.serviceName()}: ${config.roleDescriptor.roleName()} configuration:\n" @@ -247,7 +252,6 @@ class InstanceInfo { commandString += "| cwd: ${cwd}\n" commandString += "| command: ${executable} ${args.join(' ')}\n" commandString += '| environment:\n' - commandString += "| JAVA_HOME: ${javaHome}\n" env.each { k, v -> commandString += "| ${k}: ${v}\n" } commandString += "|\n| [${backgroundScript.name}]\n" backgroundScript.eachLine('UTF-8', { line -> commandString += " ${line}\n"}) diff --git a/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/fixture/hadoop/ServiceDescriptor.groovy b/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/fixture/hadoop/ServiceDescriptor.groovy index 0848460fe..4f01c94d3 100644 --- a/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/fixture/hadoop/ServiceDescriptor.groovy +++ b/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/fixture/hadoop/ServiceDescriptor.groovy @@ -24,6 +24,8 @@ import org.elasticsearch.hadoop.gradle.fixture.hadoop.conf.InstanceConfiguration import org.elasticsearch.hadoop.gradle.fixture.hadoop.conf.ServiceConfiguration import org.elasticsearch.hadoop.gradle.tasks.ApacheMirrorDownload +import static org.elasticsearch.hadoop.gradle.fixture.hadoop.conf.SettingsContainer.FileSettings + /** * Describes deployment characteristics for different Hadoop ecosystem projects. * @@ -100,7 +102,7 @@ interface ServiceDescriptor { /** * Collect all configuration entries, setting defaults for the service, role, and instance. */ - Map> collectConfigFilesContents(InstanceConfiguration configuration) + Map collectConfigFilesContents(InstanceConfiguration configuration) /** * Closure that formats a configuration map into a String for the config file contents. @@ -110,7 +112,7 @@ interface ServiceDescriptor { /** * Produces the HTTP/S URI to reach the web front end for a running instance, or null if there is no web interface. */ - String httpUri(InstanceConfiguration configuration, Map> configFileContents) + String httpUri(InstanceConfiguration configuration, Map configFileContents) /** * The command line to use for starting the given role and instance. diff --git a/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/fixture/hadoop/conf/EndProcessConfiguration.groovy b/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/fixture/hadoop/conf/EndProcessConfiguration.groovy index a781d4539..412e509c0 100644 --- a/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/fixture/hadoop/conf/EndProcessConfiguration.groovy +++ b/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/fixture/hadoop/conf/EndProcessConfiguration.groovy @@ -19,6 +19,9 @@ package org.elasticsearch.hadoop.gradle.fixture.hadoop.conf +import org.elasticsearch.gradle.testclusters.ElasticsearchCluster +import org.gradle.api.Project + /** * Provides defaults and can be slotted in as the last parent configuration in a chain. * @@ -29,8 +32,8 @@ package org.elasticsearch.hadoop.gradle.fixture.hadoop.conf */ class EndProcessConfiguration extends ProcessConfiguration { - EndProcessConfiguration() { - super(null) + EndProcessConfiguration(Project project) { + super(project) } @Override @@ -68,6 +71,11 @@ class EndProcessConfiguration extends ProcessConfiguration { return Collections.emptyList() } + @Override + String getJavaHome() { + return project.runtimeJavaHome + } + @Override String getJvmArgs() { return "" @@ -77,4 +85,9 @@ class EndProcessConfiguration extends ProcessConfiguration { boolean getDebug() { return false } + + @Override + ElasticsearchCluster getElasticsearchCluster() { + return null + } } diff --git a/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/fixture/hadoop/conf/HadoopClusterConfiguration.groovy b/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/fixture/hadoop/conf/HadoopClusterConfiguration.groovy index 29e6ea525..045be006c 100644 --- a/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/fixture/hadoop/conf/HadoopClusterConfiguration.groovy +++ b/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/fixture/hadoop/conf/HadoopClusterConfiguration.groovy @@ -47,11 +47,10 @@ class HadoopClusterConfiguration extends ProcessConfiguration { private static final Map SUPPORTED_SERVICES = [HADOOP, HIVE, PIG, SPARK] .collectEntries { [(it.id()): it] } - private static final ProcessConfiguration END = new EndProcessConfiguration() - private final Project project private final String name private final List clusterTasks + private final ProcessConfiguration defaultConfiguration private final Map serviceConfigurations private final List serviceCreationOrder @@ -60,6 +59,7 @@ class HadoopClusterConfiguration extends ProcessConfiguration { this.project = project this.name = name this.clusterTasks = [] + this.defaultConfiguration = new EndProcessConfiguration(project) this.serviceConfigurations = [:] this.serviceCreationOrder = [] } @@ -126,6 +126,6 @@ class HadoopClusterConfiguration extends ProcessConfiguration { @Override protected ProcessConfiguration parent() { - return END + return defaultConfiguration } } diff --git a/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/fixture/hadoop/conf/ProcessConfiguration.groovy b/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/fixture/hadoop/conf/ProcessConfiguration.groovy index 55a59d0b1..806f0ba77 100644 --- a/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/fixture/hadoop/conf/ProcessConfiguration.groovy +++ b/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/fixture/hadoop/conf/ProcessConfiguration.groovy @@ -19,6 +19,7 @@ package org.elasticsearch.hadoop.gradle.fixture.hadoop.conf +import org.elasticsearch.gradle.testclusters.ElasticsearchCluster import org.gradle.api.InvalidUserDataException import org.gradle.api.Project import org.gradle.api.Task @@ -38,7 +39,7 @@ abstract class ProcessConfiguration { this.project = project } - private final Project project + protected final Project project private Map systemProperties = new HashMap<>() private Map environmentVariables = new HashMap<>() private SettingsContainer settingsContainer = new SettingsContainer() @@ -46,8 +47,10 @@ abstract class ProcessConfiguration { private LinkedHashMap setupCommands = new LinkedHashMap<>() private List dependencies = new ArrayList<>() private List clusterTasks = new ArrayList<>() + private String javaHome = null private String jvmArgs = '' private boolean debug = false + private ElasticsearchCluster elasticsearchCluster = null void addSystemProperty(String key, String value) { systemProperties.put(key, value) @@ -77,7 +80,7 @@ abstract class ProcessConfiguration { return combined } - void addSetting(String key, String value) { + void addSetting(String key, Object value) { settingsContainer.addSetting(key, value) } @@ -145,6 +148,23 @@ abstract class ProcessConfiguration { return combined } + void setJavaHome(String javaHome) { + this.javaHome = javaHome + } + + String getJavaHome() { + if (this.javaHome != null) { + return this.javaHome + } else { + ProcessConfiguration parent = parent() + if (parent != null) { + return parent.getJavaHome() + } else { + return null + } + } + } + void setJvmArgs(String jvmArgs) { this.jvmArgs = jvmArgs } @@ -178,6 +198,23 @@ abstract class ProcessConfiguration { return debug } + void useElasticsearchCluster(ElasticsearchCluster elasticsearchCluster) { + this.elasticsearchCluster = elasticsearchCluster + } + + ElasticsearchCluster getElasticsearchCluster() { + if (this.elasticsearchCluster != null) { + return this.elasticsearchCluster + } else { + ProcessConfiguration parent = parent() + if (parent != null) { + return parent.getElasticsearchCluster() + } else { + return null + } + } + } + Task createClusterTask(Map options) throws InvalidUserDataException { Task task = project.tasks.create(options) addClusterTask(task) diff --git a/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/fixture/hadoop/conf/SettingsContainer.groovy b/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/fixture/hadoop/conf/SettingsContainer.groovy index 79284a05d..701c8b374 100644 --- a/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/fixture/hadoop/conf/SettingsContainer.groovy +++ b/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/fixture/hadoop/conf/SettingsContainer.groovy @@ -19,28 +19,51 @@ package org.elasticsearch.hadoop.gradle.fixture.hadoop.conf +import static org.elasticsearch.hadoop.gradle.util.ObjectUtil.unapplyString + /** * Performs organization and addition of settings for a collection of settings files */ class SettingsContainer { static class FileSettings { - private Map settings = [:] + private Map settings + + FileSettings() { + this([:]) + } - void addSetting(String key, String value) { + FileSettings(Map settings) { + this.settings = settings + } + + void addSetting(String key, Object value) { settings.put(key, value) } - void settings(Map values) { + void settings(Map values) { settings.putAll(values) } - Map getSettings() { - return settings + void putIfAbsent(String key, Object value) { + settings.putIfAbsent(key, value) + } + + Map resolve() { + return settings.collectEntries { String k, Object v -> [(k): unapplyString(v)]} as Map + } + + String get(String key) { + Object value = settings.get(key) + return unapplyString(value) + } + + String getOrDefault(String key, Object value) { + return unapplyString(settings.getOrDefault(key, value)) } } - private Map globalSettings + private Map globalSettings private Map settingsFiles SettingsContainer() { @@ -48,11 +71,11 @@ class SettingsContainer { this.settingsFiles = [:] } - void addSetting(String key, String value) { + void addSetting(String key, Object value) { globalSettings.put(key, value) } - Map getSettings() { + Map getSettings() { return globalSettings } @@ -70,13 +93,13 @@ class SettingsContainer { } } - Map flattenFile(String filename) { - Map flattened = [:] + FileSettings flattenFile(String filename) { + Map flattened = [:] flattened.putAll(globalSettings) FileSettings fileSettings = settingsFiles.get(filename) if (fileSettings != null) { - flattened.putAll(fileSettings.getSettings()) + flattened.putAll(fileSettings.settings) } - return flattened + return new FileSettings(flattened) } } diff --git a/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/fixture/hadoop/services/HadoopServiceDescriptor.groovy b/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/fixture/hadoop/services/HadoopServiceDescriptor.groovy index ea80c81f4..0a15d81fe 100644 --- a/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/fixture/hadoop/services/HadoopServiceDescriptor.groovy +++ b/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/fixture/hadoop/services/HadoopServiceDescriptor.groovy @@ -30,6 +30,8 @@ import org.elasticsearch.hadoop.gradle.fixture.hadoop.conf.SettingsContainer import org.elasticsearch.hadoop.gradle.tasks.ApacheMirrorDownload import org.gradle.api.GradleException +import static org.elasticsearch.hadoop.gradle.fixture.hadoop.conf.SettingsContainer.FileSettings + class HadoopServiceDescriptor implements ServiceDescriptor { static final Map> VERSION_MAP = [:] @@ -120,12 +122,12 @@ class HadoopServiceDescriptor implements ServiceDescriptor { } @Override - Map> collectConfigFilesContents(InstanceConfiguration configuration) { + Map collectConfigFilesContents(InstanceConfiguration configuration) { SettingsContainer container = configuration.getSettingsContainer() - Map> files = [:] + Map files = [:] // hdfs-site.xml: - Map hdfsSite = container.flattenFile('hdfs-site.xml') + FileSettings hdfsSite = container.flattenFile('hdfs-site.xml') // default replication should be 1 hdfsSite.putIfAbsent('dfs.replication', '1') @@ -152,7 +154,7 @@ class HadoopServiceDescriptor implements ServiceDescriptor { files.put('hdfs-site.xml', hdfsSite) // yarn-site.xml: - Map yarnSite = container.flattenFile('yarn-site.xml') + FileSettings yarnSite = container.flattenFile('yarn-site.xml') // Set the shuffle options yarnSite.putIfAbsent("yarn.nodemanager.aux-services", "mapreduce_shuffle") @@ -161,7 +163,7 @@ class HadoopServiceDescriptor implements ServiceDescriptor { files.put('yarn-site.xml', yarnSite) // mapred-site.xml - Map mapredSite = container.flattenFile('mapred-site.xml') + FileSettings mapredSite = container.flattenFile('mapred-site.xml') // history server addresses mapredSite.putIfAbsent('mapreduce.jobhistory.address', 'localhost:10020') @@ -170,14 +172,14 @@ class HadoopServiceDescriptor implements ServiceDescriptor { files.put('mapred-site.xml', mapredSite) // core-site.xml: - Map coreSite = container.flattenFile('core-site.xml') + FileSettings coreSite = container.flattenFile('core-site.xml') // default FS settings coreSite.putIfAbsent('fs.defaultFS', "hdfs://${hdfsSite.get('dfs.namenode.rpc-address')}") files.put('core-site.xml', coreSite) // ssl server settings (for HTTPS) - Map sslServer = container.flattenFile('ssl-server.xml') + FileSettings sslServer = container.flattenFile('ssl-server.xml') files.put('ssl-server.xml', sslServer) return files @@ -189,39 +191,39 @@ class HadoopServiceDescriptor implements ServiceDescriptor { } @Override - String httpUri(InstanceConfiguration configuration, Map> configFileContents) { + String httpUri(InstanceConfiguration configuration, Map configFileContents) { RoleDescriptor role = configuration.roleDescriptor if (NAMENODE.equals(role)) { - Map hdfsSite = configFileContents.get('hdfs-site.xml') + FileSettings hdfsSite = configFileContents.get('hdfs-site.xml') if ('HTTPS_ONLY' == hdfsSite.get('dfs.http.policy')) { - return "https://${hdfsSite.get('dfs.namenode.https-address', 'localhost:50470')}" + return "https://${hdfsSite.getOrDefault('dfs.namenode.https-address', 'localhost:50470')}" } else { - return "http://${hdfsSite.get('dfs.namenode.http-address', 'localhost:50070')}" + return "http://${hdfsSite.getOrDefault('dfs.namenode.http-address', 'localhost:50070')}" } } else if (DATANODE.equals(role)) { - Map hdfsSite = configFileContents.get('hdfs-site.xml') + FileSettings hdfsSite = configFileContents.get('hdfs-site.xml') if ('HTTPS_ONLY' == hdfsSite.get('dfs.http.policy')) { - return "https://${hdfsSite.get('dfs.datanode.https-address', 'localhost:50475')}" + return "https://${hdfsSite.getOrDefault('dfs.datanode.https-address', 'localhost:50475')}" } else { - return "http://${hdfsSite.get('dfs.datanode.http-address', 'localhost:50075')}" + return "http://${hdfsSite.getOrDefault('dfs.datanode.http-address', 'localhost:50075')}" } } else if (RESOURCEMANAGER.equals(role)) { - Map yarnSite = configFileContents.get('yarn-site.xml') + FileSettings yarnSite = configFileContents.get('yarn-site.xml') if ('HTTPS_ONLY' == yarnSite.get('yarn.http.policy')) { - return "https://${yarnSite.get('yarn.resourcemanager.webapp.address', 'localhost:8090')}" + return "https://${yarnSite.getOrDefault('yarn.resourcemanager.webapp.address', 'localhost:8090')}" } else { - return "http://${yarnSite.get('yarn.resourcemanager.webapp.https.address', 'localhost:8088')}" + return "http://${yarnSite.getOrDefault('yarn.resourcemanager.webapp.https.address', 'localhost:8088')}" } } else if (NODEMANAGER.equals(role)) { - Map yarnSite = configFileContents.get('yarn-site.xml') + FileSettings yarnSite = configFileContents.get('yarn-site.xml') if ('HTTPS_ONLY' == yarnSite.get('yarn.http.policy')) { - return "https://${yarnSite.get('yarn.nodemanager.webapp.address', 'localhost:8042')}" + return "https://${yarnSite.getOrDefault('yarn.nodemanager.webapp.address', 'localhost:8042')}" } else { - return "http://${yarnSite.get('yarn.nodemanager.webapp.address', 'localhost:8042')}" + return "http://${yarnSite.getOrDefault('yarn.nodemanager.webapp.address', 'localhost:8042')}" } } else if (HISTORYSERVER.equals(role)) { - Map mapredSite = configFileContents.get('mapred-site.xml') - return "http://${mapredSite.get('mapreduce.jobhistory.webapp.address', 'localhost:19888')}" + FileSettings mapredSite = configFileContents.get('mapred-site.xml') + return "http://${mapredSite.getOrDefault('mapreduce.jobhistory.webapp.address', 'localhost:19888')}" } else if (GATEWAY.equals(role)) { return null // No web interface for Gateway } diff --git a/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/fixture/hadoop/services/HiveServiceDescriptor.groovy b/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/fixture/hadoop/services/HiveServiceDescriptor.groovy index 716ff0227..bef8457bf 100644 --- a/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/fixture/hadoop/services/HiveServiceDescriptor.groovy +++ b/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/fixture/hadoop/services/HiveServiceDescriptor.groovy @@ -29,6 +29,8 @@ import org.elasticsearch.hadoop.gradle.fixture.hadoop.conf.ServiceConfiguration import org.elasticsearch.hadoop.gradle.tasks.ApacheMirrorDownload import org.gradle.api.GradleException +import static org.elasticsearch.hadoop.gradle.fixture.hadoop.conf.SettingsContainer.FileSettings + class HiveServiceDescriptor implements ServiceDescriptor { static final Map> VERSION_MAP = [:] @@ -114,8 +116,8 @@ class HiveServiceDescriptor implements ServiceDescriptor { } @Override - Map> collectConfigFilesContents(InstanceConfiguration configuration) { - Map hiveSite = configuration.getSettingsContainer().flattenFile('hive-site.xml') + Map collectConfigFilesContents(InstanceConfiguration configuration) { + FileSettings hiveSite = configuration.getSettingsContainer().flattenFile('hive-site.xml') return ['hive-site.xml' : hiveSite] } @@ -125,7 +127,7 @@ class HiveServiceDescriptor implements ServiceDescriptor { } @Override - String httpUri(InstanceConfiguration configuration, Map> configFileContents) { + String httpUri(InstanceConfiguration configuration, Map configFileContents) { if (HIVESERVER.equals(configuration.roleDescriptor)) { return null } diff --git a/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/fixture/hadoop/services/PigServiceDescriptor.groovy b/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/fixture/hadoop/services/PigServiceDescriptor.groovy index b57a48c62..fa8df3c1a 100644 --- a/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/fixture/hadoop/services/PigServiceDescriptor.groovy +++ b/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/fixture/hadoop/services/PigServiceDescriptor.groovy @@ -29,6 +29,8 @@ import org.elasticsearch.hadoop.gradle.fixture.hadoop.conf.ServiceConfiguration import org.elasticsearch.hadoop.gradle.tasks.ApacheMirrorDownload import org.gradle.api.GradleException +import static org.elasticsearch.hadoop.gradle.fixture.hadoop.conf.SettingsContainer.FileSettings + class PigServiceDescriptor implements ServiceDescriptor { static final Map> VERSION_MAP = [:] @@ -112,7 +114,7 @@ class PigServiceDescriptor implements ServiceDescriptor { } @Override - Map> collectConfigFilesContents(InstanceConfiguration configuration) { + Map collectConfigFilesContents(InstanceConfiguration configuration) { return ['pig.properties' : configuration.getSettingsContainer().flattenFile('pig.properties')] } @@ -122,7 +124,7 @@ class PigServiceDescriptor implements ServiceDescriptor { } @Override - String httpUri(InstanceConfiguration configuration, Map> configFileContents) { + String httpUri(InstanceConfiguration configuration, Map configFileContents) { if (GATEWAY.equals(configuration.roleDescriptor)) { return null } diff --git a/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/fixture/hadoop/services/SparkYarnServiceDescriptor.groovy b/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/fixture/hadoop/services/SparkYarnServiceDescriptor.groovy index 951aef134..170aa91fd 100644 --- a/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/fixture/hadoop/services/SparkYarnServiceDescriptor.groovy +++ b/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/fixture/hadoop/services/SparkYarnServiceDescriptor.groovy @@ -29,12 +29,15 @@ import org.elasticsearch.hadoop.gradle.fixture.hadoop.conf.ServiceConfiguration import org.elasticsearch.hadoop.gradle.tasks.ApacheMirrorDownload import org.gradle.api.GradleException +import static org.elasticsearch.hadoop.gradle.fixture.hadoop.conf.SettingsContainer.FileSettings + class SparkYarnServiceDescriptor implements ServiceDescriptor { + static final Version VERSION = new Version(2, 3, 4) static final Map> VERSION_MAP = [:] static { - VERSION_MAP.put(new Version(2, 3, 3), - ['SHA-512': '27CF9AD268E684D6926201BB5478F7F5A410659972BC79FC14AF61245EFF50C9A4363E400311B2FA9E1326E8AF1EB6DDE1D359B88B9143F25A49B3E11596B002']) + VERSION_MAP.put(VERSION, + ['SHA-512': '9FBEFCE2739990FFEDE6968A9C2F3FE399430556163BFDABDF5737A8F9E52CD535489F5CA7D641039A87700F50BFD91A706CA47979EE51A3A18787A92E2D6D53']) } static RoleDescriptor GATEWAY = RoleDescriptor.requiredGateway('spark', []) @@ -61,7 +64,7 @@ class SparkYarnServiceDescriptor implements ServiceDescriptor { @Override Version defaultVersion() { - return new Version(2, 3, 3) + return VERSION } @Override @@ -116,7 +119,7 @@ class SparkYarnServiceDescriptor implements ServiceDescriptor { } @Override - Map> collectConfigFilesContents(InstanceConfiguration configuration) { + Map collectConfigFilesContents(InstanceConfiguration configuration) { return ['spark-defaults.conf' : configuration.getSettingsContainer().flattenFile('spark-defaults.conf')] } @@ -126,7 +129,7 @@ class SparkYarnServiceDescriptor implements ServiceDescriptor { } @Override - String httpUri(InstanceConfiguration configuration, Map> configFileContents) { + String httpUri(InstanceConfiguration configuration, Map configFileContents) { if (GATEWAY.equals(configuration.roleDescriptor)) { return null } diff --git a/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/fixture/hadoop/tasks/AbstractClusterTask.groovy b/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/fixture/hadoop/tasks/AbstractClusterTask.groovy index 6e16924b7..c6534b06b 100644 --- a/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/fixture/hadoop/tasks/AbstractClusterTask.groovy +++ b/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/fixture/hadoop/tasks/AbstractClusterTask.groovy @@ -19,14 +19,15 @@ package org.elasticsearch.hadoop.gradle.fixture.hadoop.tasks +import org.elasticsearch.gradle.testclusters.DefaultTestClustersTask import org.elasticsearch.hadoop.gradle.fixture.hadoop.conf.HadoopClusterConfiguration import org.elasticsearch.hadoop.gradle.fixture.hadoop.conf.InstanceConfiguration -import org.gradle.api.DefaultTask -abstract class AbstractClusterTask extends DefaultTask { +abstract class AbstractClusterTask extends DefaultTestClustersTask { HadoopClusterConfiguration clusterConfiguration InstanceConfiguration executedOn + Map environmentVariables = [:] AbstractClusterTask() { super() @@ -37,4 +38,34 @@ abstract class AbstractClusterTask extends DefaultTask { executedOn = instance } + abstract InstanceConfiguration defaultInstance(HadoopClusterConfiguration clusterConfiguration) + abstract Map taskEnvironmentVariables() + + protected getInstance() { + return executedOn == null ? defaultInstance(this.clusterConfiguration) : executedOn + } + + protected Map collectEnvVars() { + InstanceConfiguration instance = getInstance() + + Map finalEnv = [:] + + // Set JAVA_HOME + finalEnv['JAVA_HOME'] = instance.javaHome + + // User provided environment variables from the cluster configuration + finalEnv.putAll(instance.getEnvironmentVariables()) + + // Finalize the environment variables using the service descriptor + instance.getServiceDescriptor().finalizeEnv(finalEnv, instance) + + // Add any environment variables that might be based on the specific + // task's configuration (jvm options via env, lib jars, etc...) + finalEnv.putAll(taskEnvironmentVariables()) + + // Add the explicit env variables from this task instance at the end + finalEnv.putAll(environmentVariables) + + return finalEnv + } } diff --git a/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/fixture/hadoop/tasks/DfsCopy.groovy b/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/fixture/hadoop/tasks/DfsCopy.groovy index 90dba13ba..bcba37091 100644 --- a/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/fixture/hadoop/tasks/DfsCopy.groovy +++ b/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/fixture/hadoop/tasks/DfsCopy.groovy @@ -33,7 +33,6 @@ class DfsCopy extends AbstractClusterTask { File localSource File dfsDestination File localDestination - Map env = [:] File getDfsSource() { return dfsSource @@ -95,20 +94,17 @@ class DfsCopy extends AbstractClusterTask { setLocalDestination(path) } - Map getEnv() { - return env - } - - void setEnv(Map env) { - this.env = env - } - - void env(String key, String val) { - env.put(key, val) + @Override + InstanceConfiguration defaultInstance(HadoopClusterConfiguration clusterConfiguration) { + return clusterConfiguration + .service(HadoopClusterConfiguration.HADOOP) + .role(HadoopServiceDescriptor.GATEWAY) + .instance(0) } - void env(Map values) { - env.putAll(values) + @Override + Map taskEnvironmentVariables() { + return [:] } @TaskAction @@ -125,10 +121,7 @@ class DfsCopy extends AbstractClusterTask { } // Gateway conf - InstanceConfiguration hadoopGateway = clusterConfiguration - .service(HadoopClusterConfiguration.HADOOP) - .role(HadoopServiceDescriptor.GATEWAY) - .instance(0) + InstanceConfiguration hadoopGateway = getInstance() // Determine command File baseDir = hadoopGateway.getBaseDir() @@ -153,9 +146,7 @@ class DfsCopy extends AbstractClusterTask { } // Combine env and sysprops - Map finalEnv = hadoopGateway.getEnvironmentVariables() - hadoopGateway.getServiceDescriptor().finalizeEnv(finalEnv, hadoopGateway) - finalEnv.putAll(env) + Map finalEnv = collectEnvVars() // First ensure destination directories exist if (dfsDestination != null) { diff --git a/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/fixture/hadoop/tasks/HadoopMRJob.groovy b/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/fixture/hadoop/tasks/HadoopMRJob.groovy index 186a8d263..59f73062e 100644 --- a/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/fixture/hadoop/tasks/HadoopMRJob.groovy +++ b/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/fixture/hadoop/tasks/HadoopMRJob.groovy @@ -26,24 +26,29 @@ import org.gradle.api.GradleException import org.gradle.api.tasks.TaskAction import org.gradle.process.ExecSpec +import static org.elasticsearch.hadoop.gradle.util.ObjectUtil.unapplyString + class HadoopMRJob extends AbstractClusterTask { String jobClass File jobJar - Map jobSettings = [:] + Map jobSettings = [:] List libJars = [] List args = [] Map systemProperties = [:] - Map environmentVariables = [:] - void jobSetting(String key, String value) { + void jobSetting(String key, Object value) { jobSettings.put(key, value) } - void jobSettings(Map settings) { + void jobSettings(Map settings) { jobSettings.putAll(settings) } + void libJars(File... files) { + libJars.addAll(files) + } + void systemProperty(String key, String value) { systemProperties.put(key, value) } @@ -52,6 +57,50 @@ class HadoopMRJob extends AbstractClusterTask { systemProperties.putAll(settings) } + @Override + InstanceConfiguration defaultInstance(HadoopClusterConfiguration clusterConfiguration) { + return clusterConfiguration + .service(HadoopClusterConfiguration.HADOOP) + .role(HadoopServiceDescriptor.GATEWAY) + .instance(0) + } + + @Override + Map taskEnvironmentVariables() { + Map taskEnv = [:] + InstanceConfiguration instance = getInstance() + + // Add lib jars to the user classpath if needed + if (!libJars.isEmpty()) { + taskEnv.put('YARN_USER_CLASSPATH', libJars.join(":")) + } + + // Collect the java properties and JVM options from the cluster configuration, as well as the system props from + // this specific task. Add them to the environment variable that Hadoop would expect them to be specified under. + String javaPropertyEnvVariable = instance.getServiceDescriptor().javaOptsEnvSetting(instance) + if (javaPropertyEnvVariable != null) { + List javaOpts = [taskEnv.get(javaPropertyEnvVariable, '')] + javaOpts.add(instance.getJvmArgs()) + for (Map propertyMap : [instance.getSystemProperties(), systemProperties]) { + String collectedSystemProperties = propertyMap + .collect { key, value -> "-D${key}=${value}" } + .join(" ") + if (!collectedSystemProperties.isEmpty()) { + javaOpts.add(collectedSystemProperties) + } + } + // Force the javaOptsEnvSetting to be the correct one for executing a hadoop jar command. + // We might be running this command "on a different instance" than gateway + // (it might be running on namenode, and be using namenode's env property names) + String hadoopJarCommandJavaPropertyEnvVariable = instance + .getServiceDescriptor() + .javaOptsEnvSetting(defaultInstance(clusterConfiguration)) + taskEnv.put(hadoopJarCommandJavaPropertyEnvVariable, javaOpts.join(" ").trim()) + } + + return taskEnv + } + @TaskAction void runYarnJar() { // Verification @@ -85,35 +134,13 @@ class HadoopMRJob extends AbstractClusterTask { commandLine.addAll(['-libjars', libJars.join(',')]) } if (!jobSettings.isEmpty()) { - commandLine.addAll(jobSettings.collect { k, v -> "-D${k}=${v}"}) + commandLine.addAll(jobSettings.collect { k, v -> "-D${k}=${unapplyString(v)}"}) } if (!args.isEmpty()) { commandLine.addAll(args) } - Map finalEnv = hadoopGateway.getEnvironmentVariables() - hadoopGateway.getServiceDescriptor().finalizeEnv(finalEnv, hadoopGateway) - - if (!libJars.isEmpty()) { - finalEnv.put('YARN_USER_CLASSPATH', libJars.join(":")) - } - - String javaPropertyEnvVariable = hadoopGateway.getServiceDescriptor().javaOptsEnvSetting(hadoopGateway) - if (javaPropertyEnvVariable != null) { - List javaOpts = [finalEnv.get(javaPropertyEnvVariable, '')] - javaOpts.add(hadoopGateway.getJvmArgs()) - for (Map propertyMap : [hadoopGateway.getSystemProperties(), systemProperties]) { - String collectedSystemProperties = propertyMap - .collect { key, value -> "-D${key}=${value}" } - .join(" ") - if (!collectedSystemProperties.isEmpty()) { - javaOpts.add(collectedSystemProperties) - } - } - finalEnv.put('YARN_OPTS', javaOpts.join(" ")) - } - - finalEnv.putAll(environmentVariables) + Map finalEnv = collectEnvVars() // Do command project.logger.info("Executing Command: " + commandLine) diff --git a/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/fixture/hadoop/tasks/HiveBeeline.groovy b/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/fixture/hadoop/tasks/HiveBeeline.groovy index 1dce657e2..364030296 100644 --- a/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/fixture/hadoop/tasks/HiveBeeline.groovy +++ b/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/fixture/hadoop/tasks/HiveBeeline.groovy @@ -26,12 +26,30 @@ import org.gradle.api.GradleException import org.gradle.api.tasks.TaskAction import org.gradle.process.ExecSpec +import static org.elasticsearch.hadoop.gradle.fixture.hadoop.conf.SettingsContainer.FileSettings + class HiveBeeline extends AbstractClusterTask { File script List libJars = [] String hivePrincipal - Map env = [:] + + void libJars(File... files) { + libJars.addAll(files) + } + + @Override + InstanceConfiguration defaultInstance(HadoopClusterConfiguration clusterConfiguration) { + return clusterConfiguration + .service(HadoopClusterConfiguration.HIVE) + .role(HiveServiceDescriptor.HIVESERVER) + .instance(0) + } + + @Override + Map taskEnvironmentVariables() { + return [:] + } @TaskAction void runBeeline() { @@ -41,10 +59,7 @@ class HiveBeeline extends AbstractClusterTask { } // Gateway conf - InstanceConfiguration hiveServer = clusterConfiguration - .service(HadoopClusterConfiguration.HIVE) - .role(HiveServiceDescriptor.HIVESERVER) - .instance(0) + InstanceConfiguration hiveServer = getInstance() File baseDir = hiveServer.getBaseDir() File homeDir = new File(baseDir, hiveServer.getServiceDescriptor().homeDirName(hiveServer)) @@ -63,10 +78,7 @@ class HiveBeeline extends AbstractClusterTask { commandLine.addAll(['-f', finalScript.toString()]) } - // Use the service descriptor to pick up HADOOP_HOME= - Map environment = hiveServer.getEnvironmentVariables() - hiveServer.getServiceDescriptor().finalizeEnv(environment, hiveServer) - environment.putAll(env) + Map environment = collectEnvVars() project.logger.info("Using Environment: $environment") project.exec { ExecSpec spec -> @@ -76,7 +88,7 @@ class HiveBeeline extends AbstractClusterTask { } static String getConnectionString(InstanceConfiguration hiveServer, String hivePrincipal) { - Map hiveconf = hiveServer + FileSettings hiveconf = hiveServer .getServiceDescriptor() .collectConfigFilesContents(hiveServer) .get('hive-site.xml') diff --git a/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/fixture/hadoop/tasks/PigScript.groovy b/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/fixture/hadoop/tasks/PigScript.groovy index e3c5ce082..bda002b80 100644 --- a/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/fixture/hadoop/tasks/PigScript.groovy +++ b/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/fixture/hadoop/tasks/PigScript.groovy @@ -30,7 +30,23 @@ class PigScript extends AbstractClusterTask { File script List libJars = [] - Map env = [:] + + void libJars(File... files) { + libJars.addAll(files) + } + + @Override + InstanceConfiguration defaultInstance(HadoopClusterConfiguration clusterConfiguration) { + return clusterConfiguration + .service(HadoopClusterConfiguration.PIG) + .role(PigServiceDescriptor.GATEWAY) + .instance(0) + } + + @Override + Map taskEnvironmentVariables() { + return [:] + } @TaskAction void runPig() { @@ -43,10 +59,7 @@ class PigScript extends AbstractClusterTask { } // Gateway conf - InstanceConfiguration pigGateway = clusterConfiguration - .service(HadoopClusterConfiguration.PIG) - .role(PigServiceDescriptor.GATEWAY) - .instance(0) + InstanceConfiguration pigGateway = getInstance() File baseDir = pigGateway.getBaseDir() File homeDir = new File(baseDir, pigGateway.getServiceDescriptor().homeDirName(pigGateway)) @@ -67,9 +80,7 @@ class PigScript extends AbstractClusterTask { } // Use the service descriptor to pick up HADOOP_HOME= - Map environment = pigGateway.getEnvironmentVariables() - pigGateway.getServiceDescriptor().finalizeEnv(environment, pigGateway) - environment.putAll(env) + Map environment = collectEnvVars() // Additional env's // PIG_HEAPSIZE - In MB diff --git a/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/fixture/hadoop/tasks/SparkApp.groovy b/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/fixture/hadoop/tasks/SparkApp.groovy index e80d81cfb..7b82c40a6 100644 --- a/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/fixture/hadoop/tasks/SparkApp.groovy +++ b/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/fixture/hadoop/tasks/SparkApp.groovy @@ -26,6 +26,8 @@ import org.gradle.api.GradleException import org.gradle.api.tasks.TaskAction import org.gradle.process.ExecSpec +import static org.elasticsearch.hadoop.gradle.util.ObjectUtil.unapplyString + class SparkApp extends AbstractClusterTask { enum Master { @@ -40,12 +42,11 @@ class SparkApp extends AbstractClusterTask { File jobJar Master master = Master.YARN DeployMode deployMode = DeployMode.CLIENT - Map jobSettings = [:] + Map jobSettings = [:] String principal String keytab List libJars = [] List args = [] - Map env = [:] void deployMode(DeployMode mode) { deployMode = mode @@ -59,14 +60,31 @@ class SparkApp extends AbstractClusterTask { deployMode = DeployMode.CLUSTER } - void jobSetting(String key, String value) { + void jobSetting(String key, Object value) { jobSettings.put(key, value) } - void jobSettings(Map configs) { + void jobSettings(Map configs) { jobSettings.putAll(configs) } + void libJars(File... files) { + libJars.addAll(files) + } + + @Override + InstanceConfiguration defaultInstance(HadoopClusterConfiguration clusterConfiguration) { + return clusterConfiguration + .service(HadoopClusterConfiguration.SPARK) + .role(SparkYarnServiceDescriptor.GATEWAY) + .instance(0) + } + + @Override + Map taskEnvironmentVariables() { + return [:] + } + @TaskAction void runSparkSubmit() { //Verification @@ -81,10 +99,7 @@ class SparkApp extends AbstractClusterTask { } // Gateway conf - InstanceConfiguration sparkGateway = clusterConfiguration - .service(HadoopClusterConfiguration.SPARK) - .role(SparkYarnServiceDescriptor.GATEWAY) - .instance(0) + InstanceConfiguration sparkGateway = getInstance() File baseDir = sparkGateway.getBaseDir() File homeDir = new File(baseDir, sparkGateway.getServiceDescriptor().homeDirName(sparkGateway)) @@ -113,7 +128,7 @@ class SparkApp extends AbstractClusterTask { commandLine.addAll(['--jars', libJars.join(',')]) } - jobSettings.collect { k, v -> /$k=$v/ }.forEach { conf -> commandLine.add('--conf'); commandLine.add(conf) } + jobSettings.collect { k, v -> /$k=${unapplyString(v)}/ }.forEach { conf -> commandLine.add('--conf'); commandLine.add(conf) } if (DeployMode.CLUSTER.equals(deployMode) && (principal != null || keytab != null)) { if (principal == null || keytab == null) { @@ -126,9 +141,7 @@ class SparkApp extends AbstractClusterTask { commandLine.addAll(args) // HADOOP_CONF_DIR=..../etc/hadoop - Map finalEnv = sparkGateway.getEnvironmentVariables() - sparkGateway.getServiceDescriptor().finalizeEnv(finalEnv, sparkGateway) - finalEnv.putAll(env) + Map finalEnv = collectEnvVars() // Do command project.logger.info("Command Env: " + finalEnv) diff --git a/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/util/ObjectUtil.groovy b/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/util/ObjectUtil.groovy new file mode 100644 index 000000000..cbebedef1 --- /dev/null +++ b/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/util/ObjectUtil.groovy @@ -0,0 +1,32 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.hadoop.gradle.util + +class ObjectUtil { + static String unapplyString(Object value) { + if (value == null) { + return null + } else if (value instanceof Closure) { + return ((Closure) value).call().toString() + } else { + return value.toString() + } + } +} diff --git a/qa/kerberos/build.gradle b/qa/kerberos/build.gradle index 4d7097dc7..3ddcfe994 100644 --- a/qa/kerberos/build.gradle +++ b/qa/kerberos/build.gradle @@ -84,7 +84,7 @@ dependencies { // Disable the integration tests for Kerberos until we can find a solution to the failures due to + sign // in the file path on CI. -boolean disableTests = true +boolean disableTests = false if (disableTests) { // Disable the integration tests for Kerberos until we can find a solution to the failures due to + sign // in the file path on CI. @@ -151,7 +151,7 @@ if (disableTests) { // Configure MiniKDC AntFixture kdcFixture = project.tasks.create('kdcFixture', AntFixture) { dependsOn project.configurations.kdcFixture - executable = new File(project.compilerJavaHome, 'bin/java') + executable = new File(project.runtimeJavaHome, 'bin/java') env 'CLASSPATH', "${ -> project.configurations.kdcFixture.asPath }" waitCondition = { fixture, ant -> // the kdc wrapper writes the ports file when @@ -218,7 +218,8 @@ if (disableTests) { extraConfigFile('es.keytab', esKeytab.toFile()) } - + def esAddress = "${-> testClusters.integTest.getAllHttpSocketURI().get(0)}" + // Configure Integration Test Task Test integrationTest = project.tasks.findByName('integrationTest') as Test integrationTest.dependsOn(kdcFixture) @@ -242,9 +243,10 @@ if (disableTests) { useCluster(testClusters.integTest) doLast { project.javaexec { + executable = project.runtimeJavaHome.toString() + "/bin/java" main = 'org.elasticsearch.hadoop.qa.kerberos.setup.SetupKerberosUsers' classpath = sourceSets.main.runtimeClasspath - systemProperty('es.nodes', 'localhost:9500') + systemProperty('es.nodes', esAddress) systemProperty('es.net.http.auth.user', 'test_admin') systemProperty('es.net.http.auth.pass', 'x-pack-test-password') systemProperty('principals', "$clientPrincipal$realm") @@ -265,6 +267,7 @@ if (disableTests) { // Hadoop cluster depends on KDC Fixture being up and running config.addDependency(kdcFixture) + config.useElasticsearchCluster(testClusters.integTest) config.service('hadoop') { ServiceConfiguration s -> s.addSystemProperty("java.security.krb5.conf", krb5Conf.toString()) @@ -330,7 +333,7 @@ if (disableTests) { r.addEnvironmentVariable('YARN_USER_CLASSPATH', testingJar.archivePath.toString()) r.settingsFile('yarn-site.xml') { SettingsContainer.FileSettings f -> // Add settings specifically for ES Node to allow for cancelling the tokens - f.addSetting('es.nodes', 'localhost:9500') + f.addSetting('es.nodes', esAddress) } } } @@ -344,6 +347,7 @@ if (disableTests) { s.addSetting('yarn.app.mapreduce.am.command-opts', "-Djava.security.krb5.conf=${krb5Conf.toString()}") s.addSetting('mapreduce.map.java.opts', "-Djava.security.krb5.conf=${krb5Conf.toString()}") s.addSetting('mapreduce.reduce.java.opts', "-Djava.security.krb5.conf=${krb5Conf.toString()}") + s.addSetting('es.nodes', esAddress) } config.service('pig') { ServiceConfiguration s -> s.addSetting('java.security.krb5.conf', krb5Conf.toString()) @@ -352,6 +356,7 @@ if (disableTests) { s.addSetting('yarn.app.mapreduce.am.command-opts', "-Djava.security.krb5.conf=${krb5Conf.toString()}") s.addSetting('mapreduce.map.java.opts', "-Djava.security.krb5.conf=${krb5Conf.toString()}") s.addSetting('mapreduce.reduce.java.opts', "-Djava.security.krb5.conf=${krb5Conf.toString()}") + s.addSetting('es.nodes', esAddress) } config.addDependency(jar) config.addDependency(testingJar) @@ -417,7 +422,6 @@ if (disableTests) { "test.krb5.keytab": clientKeytab.toString(), "java.security.krb5.conf": krb5Conf.toString() ]) - environmentVariables.put('HADOOP_ROOT_LOGGER','TRACE,console') args = ['-copyFromLocal', project.file(new File(mrItestResourceDir, 'artists.dat')), "/data/artists/artists.dat"] } @@ -428,20 +432,21 @@ if (disableTests) { // Run the MR job to load data to ES. Ensure Kerberos settings are available. HadoopMRJob mrLoadData = config.createClusterTask('mrLoadData', HadoopMRJob.class) { clusterConfiguration = config + useCluster(testClusters.integTest) dependsOn(copyData, setupUsers) jobJar = jar.archivePath - libJars.add(testingJar.archivePath) + libJars(testingJar.archivePath) jobClass = 'org.elasticsearch.hadoop.qa.kerberos.mr.LoadToES' - jobSettings = [ + jobSettings([ 'es.resource': 'qa_kerberos_mr_data', - 'es.nodes': 'localhost:9500', + 'es.nodes': esAddress, 'es.security.authentication': 'kerberos', 'es.net.spnego.auth.elasticsearch.principal': "${esPrincipal}${realm}", 'load.field.names': 'number,name,url,picture,@timestamp,tag', 'mapreduce.map.java.opts': "-Djava.security.krb5.conf=${krb5Conf.toString()}", 'mapreduce.reduce.java.opts': "-Djava.security.krb5.conf=${krb5Conf.toString()}", 'yarn.app.mapreduce.am.command-opts': "-Djava.security.krb5.conf=${krb5Conf.toString()}" - ] + ]) systemProperties([ "test.krb5.principal": clientPrincipal, "test.krb5.keytab": clientKeytab.toString(), @@ -454,19 +459,20 @@ if (disableTests) { // Run the MR job to read data out of ES. Ensure Kerberos settings are available. HadoopMRJob mrReadData = config.createClusterTask('mrReadData', HadoopMRJob.class) { clusterConfiguration = config + useCluster(testClusters.integTest) dependsOn(mrLoadData) jobJar = jar.archivePath - libJars.add(testingJar.archivePath) + libJars(testingJar.archivePath) jobClass = 'org.elasticsearch.hadoop.qa.kerberos.mr.ReadFromES' - jobSettings = [ + jobSettings([ 'es.resource': 'qa_kerberos_mr_data', - 'es.nodes': 'localhost:9500', + 'es.nodes': esAddress, 'es.security.authentication': 'kerberos', 'es.net.spnego.auth.elasticsearch.principal': "${esPrincipal}${realm}", 'mapreduce.map.java.opts': "-Djava.security.krb5.conf=${krb5Conf.toString()}", 'mapreduce.reduce.java.opts': "-Djava.security.krb5.conf=${krb5Conf.toString()}", 'yarn.app.mapreduce.am.command-opts': "-Djava.security.krb5.conf=${krb5Conf.toString()}" - ] + ]) systemProperties([ "test.krb5.principal": clientPrincipal, "test.krb5.keytab": clientKeytab.toString(), @@ -483,16 +489,17 @@ if (disableTests) { // Run the Spark job to load data to ES. Ensure Kerberos settings are available. SparkApp sparkLoadData = config.createClusterTask('sparkLoadData', SparkApp.class) { clusterConfiguration = config + useCluster(testClusters.integTest) dependsOn(copyData) // deployModeCluster() // principal = clientPrincipal + realm // keytab = clientKeytab.toString() jobJar = jar.archivePath - libJars.add(testingJar.archivePath) + libJars(testingJar.archivePath) jobClass = 'org.elasticsearch.hadoop.qa.kerberos.spark.LoadToES' jobSettings([ 'spark.es.resource': 'qa_kerberos_spark_data', - 'spark.es.nodes': 'localhost:9500', + 'spark.es.nodes': esAddress, 'spark.es.security.authentication': 'kerberos', 'spark.es.net.spnego.auth.elasticsearch.principal': "${esPrincipal}${realm}", 'spark.load.field.names': 'number,name,url,picture,@timestamp,tag', @@ -500,7 +507,7 @@ if (disableTests) { 'spark.driver.extraJavaOptions': "-Djava.security.krb5.conf=${krb5Conf.toString()}", 'spark.executor.extraJavaOptions': "-Djava.security.krb5.conf=${krb5Conf.toString()}" ]) - env.put('SPARK_SUBMIT_OPTS', "-Djava.security.krb5.conf=${krb5Conf.toString()} " + + environmentVariables.put('SPARK_SUBMIT_OPTS', "-Djava.security.krb5.conf=${krb5Conf.toString()} " + "-Dtest.krb5.principal=$clientPrincipal$realm " + "-Dtest.krb5.keytab=${clientKeytab.toString()}") args = ['/data/artists/artists.dat'] @@ -510,23 +517,24 @@ if (disableTests) { // Run the Spark job to load data to ES. Ensure Kerberos settings are available. SparkApp sparkReadData = config.createClusterTask('sparkReadData', SparkApp.class) { clusterConfiguration = config + useCluster(testClusters.integTest) dependsOn(sparkLoadData) // deployModeCluster() // principal = clientPrincipal + realm // keytab = clientKeytab.toString() jobJar = jar.archivePath - libJars.add(testingJar.archivePath) + libJars(testingJar.archivePath) jobClass = 'org.elasticsearch.hadoop.qa.kerberos.spark.ReadFromES' jobSettings([ 'spark.es.resource': 'qa_kerberos_spark_data', - 'spark.es.nodes': 'localhost:9500', + 'spark.es.nodes': esAddress, 'spark.es.security.authentication': 'kerberos', 'spark.es.net.spnego.auth.elasticsearch.principal': "${esPrincipal}${realm}", 'spark.yarn.am.extraJavaOptions': "-Djava.security.krb5.conf=${krb5Conf.toString()}", 'spark.driver.extraJavaOptions': "-Djava.security.krb5.conf=${krb5Conf.toString()}", 'spark.executor.extraJavaOptions': "-Djava.security.krb5.conf=${krb5Conf.toString()}" ]) - env.put('SPARK_SUBMIT_OPTS', "-Djava.security.krb5.conf=${krb5Conf.toString()} " + + environmentVariables.put('SPARK_SUBMIT_OPTS', "-Djava.security.krb5.conf=${krb5Conf.toString()} " + "-Dtest.krb5.principal=$clientPrincipal$realm " + "-Dtest.krb5.keytab=${clientKeytab.toString()}") args = ['/data/output/spark'] @@ -553,11 +561,12 @@ if (disableTests) { HiveBeeline hiveLoadData = config.createClusterTask('hiveLoadData', HiveBeeline.class) { clusterConfiguration = config + useCluster(testClusters.integTest) dependsOn(jar, setupUsers, copyData, patchBeeline) hivePrincipal = hivePrincipalName + realm script = new File(resourceDir, 'hive/load_to_es.sql') - libJars.add(testingJar.archivePath) - env.putAll([ + libJars(testingJar.archivePath) + environmentVariables.putAll([ 'HADOOP_CLIENT_OPTS': "-Djava.security.krb5.conf=${krb5Conf.toString()} " + "-Dtest.krb5.principal=$clientPrincipal$realm " + @@ -569,11 +578,12 @@ if (disableTests) { HiveBeeline hiveReadData = config.createClusterTask('hiveReadData', HiveBeeline.class) { clusterConfiguration = config + useCluster(testClusters.integTest) dependsOn(hiveLoadData) hivePrincipal = hivePrincipalName + realm script = new File(resourceDir, 'hive/read_from_es.sql') - libJars.add(testingJar.archivePath) - env.putAll([ + libJars(testingJar.archivePath) + environmentVariables.putAll([ 'HADOOP_CLIENT_OPTS': "-Djava.security.krb5.conf=${krb5Conf.toString()} " + "-Dtest.krb5.principal=$clientPrincipal$realm " + @@ -589,23 +599,25 @@ if (disableTests) { PigScript pigLoadData = config.createClusterTask('pigLoadData', PigScript.class) { clusterConfiguration = config + useCluster(testClusters.integTest) dependsOn(jar, setupUsers, copyData) script = new File(resourceDir, 'pig/load_to_es.pig') - libJars.add(testingJar.archivePath) - env = [ + libJars(testingJar.archivePath) + environmentVariables.putAll([ 'PIG_OPTS': "-Djava.security.krb5.conf=${krb5Conf.toString()}" - ] + ]) } integrationTest.dependsOn(pigLoadData) PigScript pigReadData = config.createClusterTask('pigReadData', PigScript.class) { clusterConfiguration = config + useCluster(testClusters.integTest) dependsOn(pigLoadData) script = new File(resourceDir, 'pig/read_from_es.pig') - libJars.add(testingJar.archivePath) - env = [ + libJars(testingJar.archivePath) + environmentVariables.putAll([ 'PIG_OPTS': "-Djava.security.krb5.conf=${krb5Conf.toString()}" - ] + ]) } integrationTest.dependsOn(pigReadData) @@ -644,7 +656,6 @@ if (disableTests) { "test.krb5.keytab": clientKeytab.toString(), "java.security.krb5.conf": krb5Conf.toString() ]) - environmentVariables.put('HADOOP_ROOT_LOGGER','TRACE,console') args = ['-copyToLocal', "/data/output/$integrationName", outputDataDir] } // Integration test needs to depend on copy output tasks diff --git a/qa/kerberos/src/main/java/org/elasticsearch/hadoop/qa/kerberos/setup/SetupKerberosUsers.java b/qa/kerberos/src/main/java/org/elasticsearch/hadoop/qa/kerberos/setup/SetupKerberosUsers.java index 8f303ab55..6840f4d0e 100644 --- a/qa/kerberos/src/main/java/org/elasticsearch/hadoop/qa/kerberos/setup/SetupKerberosUsers.java +++ b/qa/kerberos/src/main/java/org/elasticsearch/hadoop/qa/kerberos/setup/SetupKerberosUsers.java @@ -19,8 +19,6 @@ package org.elasticsearch.hadoop.qa.kerberos.setup; -import java.util.Properties; - import org.apache.commons.logging.LogFactory; import org.elasticsearch.hadoop.cfg.PropertiesSettings; import org.elasticsearch.hadoop.cfg.Settings; diff --git a/qa/kerberos/src/main/java/org/elasticsearch/hadoop/qa/kerberos/storm/StreamFromEs.java b/qa/kerberos/src/main/java/org/elasticsearch/hadoop/qa/kerberos/storm/StreamFromEs.java index 4efa6059f..1a3c4b570 100644 --- a/qa/kerberos/src/main/java/org/elasticsearch/hadoop/qa/kerberos/storm/StreamFromEs.java +++ b/qa/kerberos/src/main/java/org/elasticsearch/hadoop/qa/kerberos/storm/StreamFromEs.java @@ -37,12 +37,13 @@ public class StreamFromEs { public static void main(String[] args) throws Exception { final String submitPrincipal = args[0]; final String submitKeytab = args[1]; + final String esNodes = args[2]; LoginContext loginContext = LoginUtil.keytabLogin(submitPrincipal, submitKeytab); try { Subject.doAs(loginContext.getSubject(), new PrivilegedExceptionAction() { @Override public Void run() throws Exception { - submitJob(submitPrincipal, submitKeytab); + submitJob(submitPrincipal, submitKeytab, esNodes); return null; } }); @@ -51,7 +52,7 @@ public Void run() throws Exception { } } - public static void submitJob(String principal, String keytab) throws Exception { + public static void submitJob(String principal, String keytab, String esNodes) throws Exception { TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("ES", new EsSpout("storm-test")); builder.setBolt("Output", new CapturingBolt()).shuffleGrouping("ES"); @@ -62,7 +63,7 @@ public static void submitJob(String principal, String keytab) throws Exception { List plugins = new ArrayList(); plugins.add(AutoElasticsearch.class.getName()); conf.put(Config.TOPOLOGY_AUTO_CREDENTIALS, plugins); - conf.put(ConfigurationOptions.ES_PORT, "9500"); + conf.put(ConfigurationOptions.ES_NODES, esNodes); conf.put(ConfigurationOptions.ES_SECURITY_AUTHENTICATION, "kerberos"); conf.put(ConfigurationOptions.ES_NET_SPNEGO_AUTH_ELASTICSEARCH_PRINCIPAL, "HTTP/build.elastic.co@BUILD.ELASTIC.CO"); conf.put(ConfigurationOptions.ES_INPUT_JSON, "true"); diff --git a/qa/kerberos/src/main/java/org/elasticsearch/hadoop/qa/kerberos/storm/StreamToEs.java b/qa/kerberos/src/main/java/org/elasticsearch/hadoop/qa/kerberos/storm/StreamToEs.java index 19f004eaa..025706a80 100644 --- a/qa/kerberos/src/main/java/org/elasticsearch/hadoop/qa/kerberos/storm/StreamToEs.java +++ b/qa/kerberos/src/main/java/org/elasticsearch/hadoop/qa/kerberos/storm/StreamToEs.java @@ -41,12 +41,13 @@ public class StreamToEs { public static void main(String[] args) throws Exception { final String submitPrincipal = args[0]; final String submitKeytab = args[1]; + final String esNodes = args[2]; LoginContext loginContext = LoginUtil.keytabLogin(submitPrincipal, submitKeytab); try { Subject.doAs(loginContext.getSubject(), new PrivilegedExceptionAction() { @Override public Void run() throws Exception { - submitJob(submitPrincipal, submitKeytab); + submitJob(submitPrincipal, submitKeytab, esNodes); return null; } }); @@ -55,7 +56,7 @@ public Void run() throws Exception { } } - public static void submitJob(String principal, String keytab) throws Exception { + public static void submitJob(String principal, String keytab, String esNodes) throws Exception { List doc1 = Collections.singletonList("{\"reason\" : \"business\",\"airport\" : \"SFO\"}"); List doc2 = Collections.singletonList("{\"participants\" : 5,\"airport\" : \"OTP\"}"); @@ -71,7 +72,7 @@ public static void submitJob(String principal, String keytab) throws Exception { List plugins = new ArrayList(); plugins.add(AutoElasticsearch.class.getName()); conf.put(Config.TOPOLOGY_AUTO_CREDENTIALS, plugins); - conf.put(ConfigurationOptions.ES_PORT, "9500"); + conf.put(ConfigurationOptions.ES_NODES, esNodes); conf.put(ConfigurationOptions.ES_SECURITY_AUTHENTICATION, "kerberos"); conf.put(ConfigurationOptions.ES_NET_SPNEGO_AUTH_ELASTICSEARCH_PRINCIPAL, "HTTP/build.elastic.co@BUILD.ELASTIC.CO"); conf.put(ConfigurationOptions.ES_INPUT_JSON, "true"); diff --git a/qa/kerberos/src/main/resources/hive/load_to_es.sql b/qa/kerberos/src/main/resources/hive/load_to_es.sql index 51b6519f0..f4d485399 100644 --- a/qa/kerberos/src/main/resources/hive/load_to_es.sql +++ b/qa/kerberos/src/main/resources/hive/load_to_es.sql @@ -26,7 +26,6 @@ CREATE EXTERNAL TABLE IF NOT EXISTS es_artist_data ( STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler' TBLPROPERTIES( 'es.resource' = 'qa_kerberos_hive_data', - 'es.nodes' = 'localhost:9500', 'es.security.authentication' = 'kerberos', 'es.net.spnego.auth.elasticsearch.principal' = 'HTTP/build.elastic.co@BUILD.ELASTIC.CO' ); diff --git a/qa/kerberos/src/main/resources/hive/read_from_es.sql b/qa/kerberos/src/main/resources/hive/read_from_es.sql index 0959b3e1d..280c4a833 100644 --- a/qa/kerberos/src/main/resources/hive/read_from_es.sql +++ b/qa/kerberos/src/main/resources/hive/read_from_es.sql @@ -8,7 +8,7 @@ CREATE EXTERNAL TABLE IF NOT EXISTS es_artist_data ( ts TIMESTAMP, tag STRING) STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler' -TBLPROPERTIES('es.resource' = 'qa_kerberos_hive_data', 'es.nodes' = 'localhost:9500'); +TBLPROPERTIES('es.resource' = 'qa_kerberos_hive_data'); DROP TABLE IF EXISTS artist_data; diff --git a/qa/kerberos/src/main/resources/pig/load_to_es.pig b/qa/kerberos/src/main/resources/pig/load_to_es.pig index 6aec1af48..425eac16f 100644 --- a/qa/kerberos/src/main/resources/pig/load_to_es.pig +++ b/qa/kerberos/src/main/resources/pig/load_to_es.pig @@ -1,7 +1,6 @@ A = LOAD '/data/artists' USING PigStorage('\t') AS (number: chararray, name: chararray, uri: chararray, picture: chararray, timestamp: chararray, tag: chararray); STORE A INTO 'qa_kerberos_pig_data' USING org.elasticsearch.hadoop.pig.EsStorage( - 'es.nodes = localhost:9500', 'es.security.authentication = kerberos', 'es.net.spnego.auth.elasticsearch.principal = HTTP/build.elastic.co@BUILD.ELASTIC.CO' ); \ No newline at end of file diff --git a/qa/kerberos/src/main/resources/pig/read_from_es.pig b/qa/kerberos/src/main/resources/pig/read_from_es.pig index 51cc74632..6617c8ded 100644 --- a/qa/kerberos/src/main/resources/pig/read_from_es.pig +++ b/qa/kerberos/src/main/resources/pig/read_from_es.pig @@ -1,5 +1,4 @@ A = LOAD 'qa_kerberos_pig_data' USING org.elasticsearch.hadoop.pig.EsStorage( - 'es.nodes = localhost:9500', 'es.security.authentication = kerberos', 'es.net.spnego.auth.elasticsearch.principal = HTTP/build.elastic.co@BUILD.ELASTIC.CO' ); diff --git a/test/fixtures/minikdc/build.gradle b/test/fixtures/minikdc/build.gradle index 1e3fa5117..3dcd6fc7a 100644 --- a/test/fixtures/minikdc/build.gradle +++ b/test/fixtures/minikdc/build.gradle @@ -31,6 +31,10 @@ dependencies { } } +// Target Java 1.8 compilation +sourceCompatibility = '1.8' +targetCompatibility = '1.8' + // for testing, until fixture are actually debuggable. // gradle hides EVERYTHING so you have no clue what went wrong. task kdc(type: JavaExec) { From cb8cce8688da629f51ba3e5ed577813d4f1be98b Mon Sep 17 00:00:00 2001 From: James Baiera Date: Tue, 25 Feb 2020 14:14:12 -0500 Subject: [PATCH 2/4] Simplify task creation process for instances. --- .../hadoop/HadoopClusterFormationTasks.groovy | 74 ++++++------------- 1 file changed, 24 insertions(+), 50 deletions(-) diff --git a/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/fixture/hadoop/HadoopClusterFormationTasks.groovy b/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/fixture/hadoop/HadoopClusterFormationTasks.groovy index 9557d03a2..4722f344d 100644 --- a/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/fixture/hadoop/HadoopClusterFormationTasks.groovy +++ b/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/fixture/hadoop/HadoopClusterFormationTasks.groovy @@ -52,12 +52,11 @@ import static org.elasticsearch.hadoop.gradle.fixture.hadoop.conf.SettingsContai class HadoopClusterFormationTasks { /** - * A start and stop task for an instance, as well as any and all instance specific setup and teardown tasks + * A start and stop task for a fixture */ - static class InstanceTasks { + static class TaskPair { Task startTask Task stopTask - List allTasks } /** @@ -73,7 +72,7 @@ class HadoopClusterFormationTasks { *

* Returns a list of NodeInfo objects for each node in the cluster. * - * Based on {@link org.elasticsearch.gradle.test.ClusterFormationTasks} + * Based on (now removed) org.elasticsearch.gradle.test.ClusterFormationTasks */ static List setup(Project project, HadoopClusterConfiguration clusterConfiguration) { String prefix = clusterConfiguration.getName() @@ -103,20 +102,20 @@ class HadoopClusterFormationTasks { List nodes = [] // Create the fixtures for each service - List clusterTaskPairs = [] + List clusterTaskPairs = [] for (ServiceConfiguration serviceConfiguration : clusterConfiguration.getServices()) { // Get the download task for this service's package and add it to the service's dependency tasks DistributionTasks distributionTasks = getOrConfigureDistributionDownload(project, serviceConfiguration) // Keep track of the start tasks in this service - List serviceTaskPairs = [] + List serviceTaskPairs = [] // Create fixtures for each role in the service for (RoleConfiguration roleConfiguration : serviceConfiguration.getRoles()) { // Keep track of the start tasks in this role - List roleTaskPairs = [] + List roleTaskPairs = [] // Create fixtures for each instance in the role for (InstanceConfiguration instanceConfiguration : roleConfiguration.getInstances()) { @@ -140,7 +139,7 @@ class HadoopClusterFormationTasks { nodes.add(instanceInfo) // Create the tasks for the instance - InstanceTasks instanceTasks + TaskPair instanceTasks try { instanceTasks = configureNode(project, prefix, instanceDependencies, instanceInfo, distributionTasks) @@ -172,16 +171,6 @@ class HadoopClusterFormationTasks { } } } - - // Check to see if any of the instance tasks are test cluster aware, and if they are, set the - // es cluster to be whichever cluster was configured, if any - if (instanceInfo.elasticsearchCluster != null) { - for (Task instanceTask : instanceTasks.allTasks) { - if (instanceTask instanceof DefaultTestClustersTask) { - ((DefaultTestClustersTask) instanceTask).useCluster(instanceInfo.elasticsearchCluster) - } - } - } } // Make each task in the role depend on and also be finalized by each instance in the service. List startTasks = roleTaskPairs.collect{it.startTask} @@ -248,44 +237,31 @@ class HadoopClusterFormationTasks { return new DistributionTasks(download: downloadTask, verify: verifyTask) } - static InstanceTasks configureNode(Project project, String prefix, Object dependsOn, InstanceInfo node, - DistributionTasks distribution) { - List instanceTasks = [] - - Task clean = project.tasks.create(name: taskName(prefix, node, 'clean'), type: Delete, dependsOn: dependsOn) { + static TaskPair configureNode(Project project, String prefix, Object dependsOn, InstanceInfo node, + DistributionTasks distribution) { + Task setup = project.tasks.create(name: taskName(prefix, node, 'clean'), type: Delete, dependsOn: dependsOn) { delete node.homeDir delete node.cwd group = 'hadoopFixture' } - instanceTasks.add(clean) // Only create CWD and check previous if the role is an executable process - Task lastInitTask = clean if (node.getConfig().getRoleDescriptor().isExecutableProcess()) { - Task createCwd = project.tasks.create(name: taskName(prefix, node, 'createCwd'), type: DefaultTask, dependsOn: clean) { + setup = project.tasks.create(name: taskName(prefix, node, 'createCwd'), type: DefaultTask, dependsOn: setup) { doLast { node.cwd.mkdirs() } outputs.dir node.cwd group = 'hadoopFixture' } - Task checkPrevious = configureCheckPreviousTask(taskName(prefix, node, 'checkPrevious'), project, createCwd, node) - Task stopPrevious = configureStopTask(taskName(prefix, node, 'stopPrevious'), project, checkPrevious, node) - lastInitTask = stopPrevious - - instanceTasks.add(createCwd) - instanceTasks.add(checkPrevious) - instanceTasks.add(stopPrevious) + setup = configureCheckPreviousTask(taskName(prefix, node, 'checkPrevious'), project, setup, node) + setup = configureStopTask(taskName(prefix, node, 'stopPrevious'), project, setup, node) } // Always extract the package contents, and configure the files - Task extract = configureExtractTask(taskName(prefix, node, 'extract'), project, lastInitTask, node, distribution) - Task configure = configureWriteConfigTask(taskName(prefix, node, 'configure'), project, extract, node) - Task extraConfig = configureExtraConfigFilesTask(taskName(prefix, node, 'extraConfig'), project, configure, node) - - instanceTasks.add(extract) - instanceTasks.add(configure) - instanceTasks.add(extraConfig) + setup = configureExtractTask(taskName(prefix, node, 'extract'), project, setup, node, distribution) + setup = configureWriteConfigTask(taskName(prefix, node, 'configure'), project, setup, node) + setup = configureExtraConfigFilesTask(taskName(prefix, node, 'extraConfig'), project, setup, node) // If the role for this instance is not a process, we skip creating start and stop tasks for it. if (!node.getConfig().getRoleDescriptor().isExecutableProcess()) { @@ -294,16 +270,15 @@ class HadoopClusterFormationTasks { for (Object dependency : node.config.getDependencies()) { if (dependency instanceof Fixture) { def depStop = ((Fixture)dependency).stopTask - extraConfig.finalizedBy(depStop) + setup.finalizedBy(depStop) } } - return new InstanceTasks(startTask: extraConfig, allTasks: instanceTasks) + return new TaskPair(startTask: setup) } Map setupCommands = new LinkedHashMap<>() setupCommands.putAll(node.config.getServiceDescriptor().defaultSetupCommands(node.config)) setupCommands.putAll(node.config.getSetupCommands()) - Task lastSetupCommand = extraConfig for (Map.Entry command : setupCommands) { // the first argument is the actual script name, relative to home Object[] args = command.getValue().clone() @@ -323,21 +298,17 @@ class HadoopClusterFormationTasks { commandPath = node.homeDir.toPath().resolve(args[0].toString()).toString() } args[0] = commandPath - lastSetupCommand = configureExecTask(taskName(prefix, node, command.getKey()), project, lastSetupCommand, node, args) - instanceTasks.add(lastSetupCommand) + setup = configureExecTask(taskName(prefix, node, command.getKey()), project, setup, node, args) } // Configure daemon start task - Task start = configureStartTask(taskName(prefix, node, 'start'), project, lastSetupCommand, node) - instanceTasks.add(start) + Task start = configureStartTask(taskName(prefix, node, 'start'), project, setup, node) // Configure wait task Task wait = configureWaitTask(taskName(prefix, node, 'wait'), project, node, start, 30) - instanceTasks.add(wait) // Configure daemon stop task Task stop = configureStopTask(taskName(prefix, node, 'stop'), project, [], node) - instanceTasks.add(stop) // We're running in the background, so make sure that the stop command is called after all cluster tasks finish wait.finalizedBy(stop) @@ -351,7 +322,7 @@ class HadoopClusterFormationTasks { stop.finalizedBy(depStop) } } - return new InstanceTasks(startTask: wait, stopTask: stop, allTasks: instanceTasks) + return new TaskPair(startTask: wait, stopTask: stop) } static Task configureCheckPreviousTask(String name, Project project, Task setup, InstanceInfo node) { @@ -376,6 +347,9 @@ class HadoopClusterFormationTasks { // Add all node level configs to node Configuration return project.tasks.create(name: name, type: DefaultTestClustersTask, dependsOn: setup) { group = 'hadoopFixture' + if (node.elasticsearchCluster != null) { + useCluster(node.elasticsearchCluster) + } doFirst { // Write each config file needed node.configFiles.forEach { configFile -> From 4de9de352325bb708151654c780ca6bbbc10376d Mon Sep 17 00:00:00 2001 From: James Baiera Date: Tue, 25 Feb 2020 14:45:02 -0500 Subject: [PATCH 3/4] Re-disable the kerberos tests until CI is ready to go --- qa/kerberos/build.gradle | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/qa/kerberos/build.gradle b/qa/kerberos/build.gradle index 3ddcfe994..b18c2d74a 100644 --- a/qa/kerberos/build.gradle +++ b/qa/kerberos/build.gradle @@ -84,7 +84,7 @@ dependencies { // Disable the integration tests for Kerberos until we can find a solution to the failures due to + sign // in the file path on CI. -boolean disableTests = false +boolean disableTests = true if (disableTests) { // Disable the integration tests for Kerberos until we can find a solution to the failures due to + sign // in the file path on CI. From c32ff583970d3ec38ab2a2f56c199253f3b847b9 Mon Sep 17 00:00:00 2001 From: James Baiera Date: Mon, 9 Mar 2020 12:26:00 -0400 Subject: [PATCH 4/4] PR Suggestions --- .../gradle/fixture/hadoop/HadoopClusterFormationTasks.groovy | 2 -- .../org/elasticsearch/hadoop/gradle/util/ObjectUtil.groovy | 4 +++- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/fixture/hadoop/HadoopClusterFormationTasks.groovy b/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/fixture/hadoop/HadoopClusterFormationTasks.groovy index 4722f344d..a0f024a02 100644 --- a/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/fixture/hadoop/HadoopClusterFormationTasks.groovy +++ b/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/fixture/hadoop/HadoopClusterFormationTasks.groovy @@ -71,8 +71,6 @@ class HadoopClusterFormationTasks { * Adds dependent tasks to the given task to start and stop a cluster with the given configuration. *

* Returns a list of NodeInfo objects for each node in the cluster. - * - * Based on (now removed) org.elasticsearch.gradle.test.ClusterFormationTasks */ static List setup(Project project, HadoopClusterConfiguration clusterConfiguration) { String prefix = clusterConfiguration.getName() diff --git a/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/util/ObjectUtil.groovy b/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/util/ObjectUtil.groovy index cbebedef1..cad9f57a6 100644 --- a/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/util/ObjectUtil.groovy +++ b/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/util/ObjectUtil.groovy @@ -19,11 +19,13 @@ package org.elasticsearch.hadoop.gradle.util +import java.util.concurrent.Callable + class ObjectUtil { static String unapplyString(Object value) { if (value == null) { return null - } else if (value instanceof Closure) { + } else if (value instanceof Callable) { return ((Closure) value).call().toString() } else { return value.toString()