diff --git a/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/fixture/hadoop/ConfigFormats.groovy b/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/fixture/hadoop/ConfigFormats.groovy index 84e7519c7..8cd6f3b91 100644 --- a/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/fixture/hadoop/ConfigFormats.groovy +++ b/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/fixture/hadoop/ConfigFormats.groovy @@ -19,11 +19,13 @@ package org.elasticsearch.hadoop.gradle.fixture.hadoop +import static org.elasticsearch.hadoop.gradle.fixture.hadoop.conf.SettingsContainer.FileSettings + class ConfigFormats { static Closure hadoopXML() { - return { Map conf -> - String props = conf.collect { key, value -> + return { FileSettings conf -> + String props = conf.resolve().collect { key, value -> "\n\t\t${key}\n\t\t${value}\n\t" }.join("\n\t") return "\n\t${props}\n" @@ -31,16 +33,16 @@ class ConfigFormats { } static Closure propertyFile() { - return { Map conf -> - conf.collect { key, value -> + return { FileSettings conf -> + conf.resolve().collect { key, value -> "${key}=${value}" }.join("\n") } } static Closure whiteSpaced() { - return { Map conf -> - conf.collect { key, value -> + return { FileSettings conf -> + conf.resolve().collect { key, value -> "${key} ${value}" }.join("\n") } diff --git a/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/fixture/hadoop/HadoopClusterFormationTasks.groovy b/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/fixture/hadoop/HadoopClusterFormationTasks.groovy index fe6548d1a..a0f024a02 100644 --- a/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/fixture/hadoop/HadoopClusterFormationTasks.groovy +++ b/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/fixture/hadoop/HadoopClusterFormationTasks.groovy @@ -23,6 +23,7 @@ import org.apache.tools.ant.DefaultLogger import org.elasticsearch.gradle.LoggedExec import org.elasticsearch.gradle.Version import org.elasticsearch.gradle.test.Fixture +import org.elasticsearch.gradle.testclusters.DefaultTestClustersTask import org.elasticsearch.hadoop.gradle.fixture.hadoop.conf.HadoopClusterConfiguration import org.elasticsearch.hadoop.gradle.fixture.hadoop.conf.InstanceConfiguration import org.elasticsearch.hadoop.gradle.fixture.hadoop.conf.RoleConfiguration @@ -42,6 +43,8 @@ import org.apache.tools.ant.taskdefs.condition.Os import java.nio.file.Paths +import static org.elasticsearch.hadoop.gradle.fixture.hadoop.conf.SettingsContainer.FileSettings + /** * A helper for creating tasks to build a cluster that is used by a task, and tear down the cluster * when the task is finished. @@ -68,8 +71,6 @@ class HadoopClusterFormationTasks { * Adds dependent tasks to the given task to start and stop a cluster with the given configuration. *

* Returns a list of NodeInfo objects for each node in the cluster. - * - * Based on {@link org.elasticsearch.gradle.test.ClusterFormationTasks} */ static List setup(Project project, HadoopClusterConfiguration clusterConfiguration) { String prefix = clusterConfiguration.getName() @@ -342,13 +343,16 @@ class HadoopClusterFormationTasks { static Task configureWriteConfigTask(String name, Project project, Task setup, InstanceInfo node) { // Add all node level configs to node Configuration - return project.tasks.create(name: name, type: DefaultTask, dependsOn: setup) { + return project.tasks.create(name: name, type: DefaultTestClustersTask, dependsOn: setup) { group = 'hadoopFixture' + if (node.elasticsearchCluster != null) { + useCluster(node.elasticsearchCluster) + } doFirst { // Write each config file needed node.configFiles.forEach { configFile -> String configName = configFile.getName() - Map configFileEntries = node.configContents.get(configName) + FileSettings configFileEntries = node.configContents.get(configName) if (configFileEntries == null) { throw new GradleException("Could not find contents of [${configFile}] settings file from deployment options.") } @@ -387,7 +391,6 @@ class HadoopClusterFormationTasks { return project.tasks.create(name: name, type: LoggedExec, dependsOn: setup) { Exec exec -> exec.group = 'hadoopFixture' exec.workingDir node.cwd - exec.environment 'JAVA_HOME', node.getJavaHome() exec.environment(node.env) // Configure HADOOP_OPTS (or similar env) - adds system properties, assertion flags, remote debug etc diff --git a/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/fixture/hadoop/InstanceInfo.groovy b/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/fixture/hadoop/InstanceInfo.groovy index 183aceeaf..92c39195b 100644 --- a/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/fixture/hadoop/InstanceInfo.groovy +++ b/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/fixture/hadoop/InstanceInfo.groovy @@ -20,7 +20,7 @@ package org.elasticsearch.hadoop.gradle.fixture.hadoop import org.apache.tools.ant.taskdefs.condition.Os -import org.elasticsearch.gradle.info.BuildParams +import org.elasticsearch.gradle.testclusters.ElasticsearchCluster import org.elasticsearch.hadoop.gradle.util.WaitForURL import org.elasticsearch.hadoop.gradle.fixture.hadoop.conf.InstanceConfiguration import org.gradle.api.GradleException @@ -30,6 +30,8 @@ import java.nio.file.Path import java.nio.file.Paths import java.util.concurrent.TimeUnit +import static org.elasticsearch.hadoop.gradle.fixture.hadoop.conf.SettingsContainer.FileSettings + /** * Generic information for any process running in a hadoop ecosystem. * @@ -70,7 +72,7 @@ class InstanceInfo { /** The config files */ List configFiles - Map> configContents + Map configContents /** Closure that renders the contents of the config file */ Closure configFileFormatter @@ -84,8 +86,8 @@ class InstanceInfo { /** stdout/stderr log of the service process for this instance */ File startLog - /** Major version of java this node runs with, or {@code null} if using the runtime java version */ - Integer javaVersion + /** Location of the java installation to use when running processes **/ + String javaHome /** environment variables to start the node with */ Map env @@ -108,6 +110,9 @@ class InstanceInfo { /** buffer for ant output when starting this node */ ByteArrayOutputStream buffer = new ByteArrayOutputStream() + /** Elasticsearch cluster dependency for tasks **/ + ElasticsearchCluster elasticsearchCluster + /** * A closure to call before the cluster is considered ready. The closure is passed the node info, * as well as a groovy AntBuilder, to enable running ant condition checks. The default wait @@ -155,13 +160,16 @@ class InstanceInfo { startLog = new File(cwd, 'run.log') // We just default to the current runtime at this time - javaVersion = 8 + javaHome = config.getJavaHome() // Prepare Environment env = [:] env.putAll(config.getEnvironmentVariables()) config.getServiceDescriptor().finalizeEnv(env, config) + // Add JAVA_HOME to the environment + env['JAVA_HOME'] = javaHome + // Prepare startup command and arguments args = [] List startCommandLine = config.getServiceDescriptor().startCommand(config) @@ -202,6 +210,8 @@ class InstanceInfo { if (Os.isFamily(Os.FAMILY_WINDOWS)) { args.add('"') // end the entire command, quoted } + + this.elasticsearchCluster = config.getElasticsearchCluster() } Path binPath() { @@ -235,11 +245,6 @@ class InstanceInfo { throw new UnsupportedOperationException("JNAKernal32Library is compiled for Java 10 and up.") } - /** Return the java home used by this node. */ - String getJavaHome() { - return javaVersion == null ? project.runtimeJavaHome : BuildParams.javaVersions.find { it.version == javaVersion }.javaHome.absolutePath - } - /** Returns debug string for the command that started this node. */ String getCommandString() { String commandString = "\nService ${config.serviceDescriptor.serviceName()}: ${config.roleDescriptor.roleName()} configuration:\n" @@ -247,7 +252,6 @@ class InstanceInfo { commandString += "| cwd: ${cwd}\n" commandString += "| command: ${executable} ${args.join(' ')}\n" commandString += '| environment:\n' - commandString += "| JAVA_HOME: ${javaHome}\n" env.each { k, v -> commandString += "| ${k}: ${v}\n" } commandString += "|\n| [${backgroundScript.name}]\n" backgroundScript.eachLine('UTF-8', { line -> commandString += " ${line}\n"}) diff --git a/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/fixture/hadoop/ServiceDescriptor.groovy b/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/fixture/hadoop/ServiceDescriptor.groovy index 0848460fe..4f01c94d3 100644 --- a/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/fixture/hadoop/ServiceDescriptor.groovy +++ b/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/fixture/hadoop/ServiceDescriptor.groovy @@ -24,6 +24,8 @@ import org.elasticsearch.hadoop.gradle.fixture.hadoop.conf.InstanceConfiguration import org.elasticsearch.hadoop.gradle.fixture.hadoop.conf.ServiceConfiguration import org.elasticsearch.hadoop.gradle.tasks.ApacheMirrorDownload +import static org.elasticsearch.hadoop.gradle.fixture.hadoop.conf.SettingsContainer.FileSettings + /** * Describes deployment characteristics for different Hadoop ecosystem projects. * @@ -100,7 +102,7 @@ interface ServiceDescriptor { /** * Collect all configuration entries, setting defaults for the service, role, and instance. */ - Map> collectConfigFilesContents(InstanceConfiguration configuration) + Map collectConfigFilesContents(InstanceConfiguration configuration) /** * Closure that formats a configuration map into a String for the config file contents. @@ -110,7 +112,7 @@ interface ServiceDescriptor { /** * Produces the HTTP/S URI to reach the web front end for a running instance, or null if there is no web interface. */ - String httpUri(InstanceConfiguration configuration, Map> configFileContents) + String httpUri(InstanceConfiguration configuration, Map configFileContents) /** * The command line to use for starting the given role and instance. diff --git a/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/fixture/hadoop/conf/EndProcessConfiguration.groovy b/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/fixture/hadoop/conf/EndProcessConfiguration.groovy index a781d4539..412e509c0 100644 --- a/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/fixture/hadoop/conf/EndProcessConfiguration.groovy +++ b/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/fixture/hadoop/conf/EndProcessConfiguration.groovy @@ -19,6 +19,9 @@ package org.elasticsearch.hadoop.gradle.fixture.hadoop.conf +import org.elasticsearch.gradle.testclusters.ElasticsearchCluster +import org.gradle.api.Project + /** * Provides defaults and can be slotted in as the last parent configuration in a chain. * @@ -29,8 +32,8 @@ package org.elasticsearch.hadoop.gradle.fixture.hadoop.conf */ class EndProcessConfiguration extends ProcessConfiguration { - EndProcessConfiguration() { - super(null) + EndProcessConfiguration(Project project) { + super(project) } @Override @@ -68,6 +71,11 @@ class EndProcessConfiguration extends ProcessConfiguration { return Collections.emptyList() } + @Override + String getJavaHome() { + return project.runtimeJavaHome + } + @Override String getJvmArgs() { return "" @@ -77,4 +85,9 @@ class EndProcessConfiguration extends ProcessConfiguration { boolean getDebug() { return false } + + @Override + ElasticsearchCluster getElasticsearchCluster() { + return null + } } diff --git a/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/fixture/hadoop/conf/HadoopClusterConfiguration.groovy b/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/fixture/hadoop/conf/HadoopClusterConfiguration.groovy index 29e6ea525..045be006c 100644 --- a/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/fixture/hadoop/conf/HadoopClusterConfiguration.groovy +++ b/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/fixture/hadoop/conf/HadoopClusterConfiguration.groovy @@ -47,11 +47,10 @@ class HadoopClusterConfiguration extends ProcessConfiguration { private static final Map SUPPORTED_SERVICES = [HADOOP, HIVE, PIG, SPARK] .collectEntries { [(it.id()): it] } - private static final ProcessConfiguration END = new EndProcessConfiguration() - private final Project project private final String name private final List clusterTasks + private final ProcessConfiguration defaultConfiguration private final Map serviceConfigurations private final List serviceCreationOrder @@ -60,6 +59,7 @@ class HadoopClusterConfiguration extends ProcessConfiguration { this.project = project this.name = name this.clusterTasks = [] + this.defaultConfiguration = new EndProcessConfiguration(project) this.serviceConfigurations = [:] this.serviceCreationOrder = [] } @@ -126,6 +126,6 @@ class HadoopClusterConfiguration extends ProcessConfiguration { @Override protected ProcessConfiguration parent() { - return END + return defaultConfiguration } } diff --git a/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/fixture/hadoop/conf/ProcessConfiguration.groovy b/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/fixture/hadoop/conf/ProcessConfiguration.groovy index 55a59d0b1..806f0ba77 100644 --- a/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/fixture/hadoop/conf/ProcessConfiguration.groovy +++ b/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/fixture/hadoop/conf/ProcessConfiguration.groovy @@ -19,6 +19,7 @@ package org.elasticsearch.hadoop.gradle.fixture.hadoop.conf +import org.elasticsearch.gradle.testclusters.ElasticsearchCluster import org.gradle.api.InvalidUserDataException import org.gradle.api.Project import org.gradle.api.Task @@ -38,7 +39,7 @@ abstract class ProcessConfiguration { this.project = project } - private final Project project + protected final Project project private Map systemProperties = new HashMap<>() private Map environmentVariables = new HashMap<>() private SettingsContainer settingsContainer = new SettingsContainer() @@ -46,8 +47,10 @@ abstract class ProcessConfiguration { private LinkedHashMap setupCommands = new LinkedHashMap<>() private List dependencies = new ArrayList<>() private List clusterTasks = new ArrayList<>() + private String javaHome = null private String jvmArgs = '' private boolean debug = false + private ElasticsearchCluster elasticsearchCluster = null void addSystemProperty(String key, String value) { systemProperties.put(key, value) @@ -77,7 +80,7 @@ abstract class ProcessConfiguration { return combined } - void addSetting(String key, String value) { + void addSetting(String key, Object value) { settingsContainer.addSetting(key, value) } @@ -145,6 +148,23 @@ abstract class ProcessConfiguration { return combined } + void setJavaHome(String javaHome) { + this.javaHome = javaHome + } + + String getJavaHome() { + if (this.javaHome != null) { + return this.javaHome + } else { + ProcessConfiguration parent = parent() + if (parent != null) { + return parent.getJavaHome() + } else { + return null + } + } + } + void setJvmArgs(String jvmArgs) { this.jvmArgs = jvmArgs } @@ -178,6 +198,23 @@ abstract class ProcessConfiguration { return debug } + void useElasticsearchCluster(ElasticsearchCluster elasticsearchCluster) { + this.elasticsearchCluster = elasticsearchCluster + } + + ElasticsearchCluster getElasticsearchCluster() { + if (this.elasticsearchCluster != null) { + return this.elasticsearchCluster + } else { + ProcessConfiguration parent = parent() + if (parent != null) { + return parent.getElasticsearchCluster() + } else { + return null + } + } + } + Task createClusterTask(Map options) throws InvalidUserDataException { Task task = project.tasks.create(options) addClusterTask(task) diff --git a/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/fixture/hadoop/conf/SettingsContainer.groovy b/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/fixture/hadoop/conf/SettingsContainer.groovy index 79284a05d..701c8b374 100644 --- a/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/fixture/hadoop/conf/SettingsContainer.groovy +++ b/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/fixture/hadoop/conf/SettingsContainer.groovy @@ -19,28 +19,51 @@ package org.elasticsearch.hadoop.gradle.fixture.hadoop.conf +import static org.elasticsearch.hadoop.gradle.util.ObjectUtil.unapplyString + /** * Performs organization and addition of settings for a collection of settings files */ class SettingsContainer { static class FileSettings { - private Map settings = [:] + private Map settings + + FileSettings() { + this([:]) + } - void addSetting(String key, String value) { + FileSettings(Map settings) { + this.settings = settings + } + + void addSetting(String key, Object value) { settings.put(key, value) } - void settings(Map values) { + void settings(Map values) { settings.putAll(values) } - Map getSettings() { - return settings + void putIfAbsent(String key, Object value) { + settings.putIfAbsent(key, value) + } + + Map resolve() { + return settings.collectEntries { String k, Object v -> [(k): unapplyString(v)]} as Map + } + + String get(String key) { + Object value = settings.get(key) + return unapplyString(value) + } + + String getOrDefault(String key, Object value) { + return unapplyString(settings.getOrDefault(key, value)) } } - private Map globalSettings + private Map globalSettings private Map settingsFiles SettingsContainer() { @@ -48,11 +71,11 @@ class SettingsContainer { this.settingsFiles = [:] } - void addSetting(String key, String value) { + void addSetting(String key, Object value) { globalSettings.put(key, value) } - Map getSettings() { + Map getSettings() { return globalSettings } @@ -70,13 +93,13 @@ class SettingsContainer { } } - Map flattenFile(String filename) { - Map flattened = [:] + FileSettings flattenFile(String filename) { + Map flattened = [:] flattened.putAll(globalSettings) FileSettings fileSettings = settingsFiles.get(filename) if (fileSettings != null) { - flattened.putAll(fileSettings.getSettings()) + flattened.putAll(fileSettings.settings) } - return flattened + return new FileSettings(flattened) } } diff --git a/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/fixture/hadoop/services/HadoopServiceDescriptor.groovy b/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/fixture/hadoop/services/HadoopServiceDescriptor.groovy index ea80c81f4..0a15d81fe 100644 --- a/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/fixture/hadoop/services/HadoopServiceDescriptor.groovy +++ b/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/fixture/hadoop/services/HadoopServiceDescriptor.groovy @@ -30,6 +30,8 @@ import org.elasticsearch.hadoop.gradle.fixture.hadoop.conf.SettingsContainer import org.elasticsearch.hadoop.gradle.tasks.ApacheMirrorDownload import org.gradle.api.GradleException +import static org.elasticsearch.hadoop.gradle.fixture.hadoop.conf.SettingsContainer.FileSettings + class HadoopServiceDescriptor implements ServiceDescriptor { static final Map> VERSION_MAP = [:] @@ -120,12 +122,12 @@ class HadoopServiceDescriptor implements ServiceDescriptor { } @Override - Map> collectConfigFilesContents(InstanceConfiguration configuration) { + Map collectConfigFilesContents(InstanceConfiguration configuration) { SettingsContainer container = configuration.getSettingsContainer() - Map> files = [:] + Map files = [:] // hdfs-site.xml: - Map hdfsSite = container.flattenFile('hdfs-site.xml') + FileSettings hdfsSite = container.flattenFile('hdfs-site.xml') // default replication should be 1 hdfsSite.putIfAbsent('dfs.replication', '1') @@ -152,7 +154,7 @@ class HadoopServiceDescriptor implements ServiceDescriptor { files.put('hdfs-site.xml', hdfsSite) // yarn-site.xml: - Map yarnSite = container.flattenFile('yarn-site.xml') + FileSettings yarnSite = container.flattenFile('yarn-site.xml') // Set the shuffle options yarnSite.putIfAbsent("yarn.nodemanager.aux-services", "mapreduce_shuffle") @@ -161,7 +163,7 @@ class HadoopServiceDescriptor implements ServiceDescriptor { files.put('yarn-site.xml', yarnSite) // mapred-site.xml - Map mapredSite = container.flattenFile('mapred-site.xml') + FileSettings mapredSite = container.flattenFile('mapred-site.xml') // history server addresses mapredSite.putIfAbsent('mapreduce.jobhistory.address', 'localhost:10020') @@ -170,14 +172,14 @@ class HadoopServiceDescriptor implements ServiceDescriptor { files.put('mapred-site.xml', mapredSite) // core-site.xml: - Map coreSite = container.flattenFile('core-site.xml') + FileSettings coreSite = container.flattenFile('core-site.xml') // default FS settings coreSite.putIfAbsent('fs.defaultFS', "hdfs://${hdfsSite.get('dfs.namenode.rpc-address')}") files.put('core-site.xml', coreSite) // ssl server settings (for HTTPS) - Map sslServer = container.flattenFile('ssl-server.xml') + FileSettings sslServer = container.flattenFile('ssl-server.xml') files.put('ssl-server.xml', sslServer) return files @@ -189,39 +191,39 @@ class HadoopServiceDescriptor implements ServiceDescriptor { } @Override - String httpUri(InstanceConfiguration configuration, Map> configFileContents) { + String httpUri(InstanceConfiguration configuration, Map configFileContents) { RoleDescriptor role = configuration.roleDescriptor if (NAMENODE.equals(role)) { - Map hdfsSite = configFileContents.get('hdfs-site.xml') + FileSettings hdfsSite = configFileContents.get('hdfs-site.xml') if ('HTTPS_ONLY' == hdfsSite.get('dfs.http.policy')) { - return "https://${hdfsSite.get('dfs.namenode.https-address', 'localhost:50470')}" + return "https://${hdfsSite.getOrDefault('dfs.namenode.https-address', 'localhost:50470')}" } else { - return "http://${hdfsSite.get('dfs.namenode.http-address', 'localhost:50070')}" + return "http://${hdfsSite.getOrDefault('dfs.namenode.http-address', 'localhost:50070')}" } } else if (DATANODE.equals(role)) { - Map hdfsSite = configFileContents.get('hdfs-site.xml') + FileSettings hdfsSite = configFileContents.get('hdfs-site.xml') if ('HTTPS_ONLY' == hdfsSite.get('dfs.http.policy')) { - return "https://${hdfsSite.get('dfs.datanode.https-address', 'localhost:50475')}" + return "https://${hdfsSite.getOrDefault('dfs.datanode.https-address', 'localhost:50475')}" } else { - return "http://${hdfsSite.get('dfs.datanode.http-address', 'localhost:50075')}" + return "http://${hdfsSite.getOrDefault('dfs.datanode.http-address', 'localhost:50075')}" } } else if (RESOURCEMANAGER.equals(role)) { - Map yarnSite = configFileContents.get('yarn-site.xml') + FileSettings yarnSite = configFileContents.get('yarn-site.xml') if ('HTTPS_ONLY' == yarnSite.get('yarn.http.policy')) { - return "https://${yarnSite.get('yarn.resourcemanager.webapp.address', 'localhost:8090')}" + return "https://${yarnSite.getOrDefault('yarn.resourcemanager.webapp.address', 'localhost:8090')}" } else { - return "http://${yarnSite.get('yarn.resourcemanager.webapp.https.address', 'localhost:8088')}" + return "http://${yarnSite.getOrDefault('yarn.resourcemanager.webapp.https.address', 'localhost:8088')}" } } else if (NODEMANAGER.equals(role)) { - Map yarnSite = configFileContents.get('yarn-site.xml') + FileSettings yarnSite = configFileContents.get('yarn-site.xml') if ('HTTPS_ONLY' == yarnSite.get('yarn.http.policy')) { - return "https://${yarnSite.get('yarn.nodemanager.webapp.address', 'localhost:8042')}" + return "https://${yarnSite.getOrDefault('yarn.nodemanager.webapp.address', 'localhost:8042')}" } else { - return "http://${yarnSite.get('yarn.nodemanager.webapp.address', 'localhost:8042')}" + return "http://${yarnSite.getOrDefault('yarn.nodemanager.webapp.address', 'localhost:8042')}" } } else if (HISTORYSERVER.equals(role)) { - Map mapredSite = configFileContents.get('mapred-site.xml') - return "http://${mapredSite.get('mapreduce.jobhistory.webapp.address', 'localhost:19888')}" + FileSettings mapredSite = configFileContents.get('mapred-site.xml') + return "http://${mapredSite.getOrDefault('mapreduce.jobhistory.webapp.address', 'localhost:19888')}" } else if (GATEWAY.equals(role)) { return null // No web interface for Gateway } diff --git a/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/fixture/hadoop/services/HiveServiceDescriptor.groovy b/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/fixture/hadoop/services/HiveServiceDescriptor.groovy index 716ff0227..bef8457bf 100644 --- a/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/fixture/hadoop/services/HiveServiceDescriptor.groovy +++ b/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/fixture/hadoop/services/HiveServiceDescriptor.groovy @@ -29,6 +29,8 @@ import org.elasticsearch.hadoop.gradle.fixture.hadoop.conf.ServiceConfiguration import org.elasticsearch.hadoop.gradle.tasks.ApacheMirrorDownload import org.gradle.api.GradleException +import static org.elasticsearch.hadoop.gradle.fixture.hadoop.conf.SettingsContainer.FileSettings + class HiveServiceDescriptor implements ServiceDescriptor { static final Map> VERSION_MAP = [:] @@ -114,8 +116,8 @@ class HiveServiceDescriptor implements ServiceDescriptor { } @Override - Map> collectConfigFilesContents(InstanceConfiguration configuration) { - Map hiveSite = configuration.getSettingsContainer().flattenFile('hive-site.xml') + Map collectConfigFilesContents(InstanceConfiguration configuration) { + FileSettings hiveSite = configuration.getSettingsContainer().flattenFile('hive-site.xml') return ['hive-site.xml' : hiveSite] } @@ -125,7 +127,7 @@ class HiveServiceDescriptor implements ServiceDescriptor { } @Override - String httpUri(InstanceConfiguration configuration, Map> configFileContents) { + String httpUri(InstanceConfiguration configuration, Map configFileContents) { if (HIVESERVER.equals(configuration.roleDescriptor)) { return null } diff --git a/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/fixture/hadoop/services/PigServiceDescriptor.groovy b/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/fixture/hadoop/services/PigServiceDescriptor.groovy index b57a48c62..fa8df3c1a 100644 --- a/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/fixture/hadoop/services/PigServiceDescriptor.groovy +++ b/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/fixture/hadoop/services/PigServiceDescriptor.groovy @@ -29,6 +29,8 @@ import org.elasticsearch.hadoop.gradle.fixture.hadoop.conf.ServiceConfiguration import org.elasticsearch.hadoop.gradle.tasks.ApacheMirrorDownload import org.gradle.api.GradleException +import static org.elasticsearch.hadoop.gradle.fixture.hadoop.conf.SettingsContainer.FileSettings + class PigServiceDescriptor implements ServiceDescriptor { static final Map> VERSION_MAP = [:] @@ -112,7 +114,7 @@ class PigServiceDescriptor implements ServiceDescriptor { } @Override - Map> collectConfigFilesContents(InstanceConfiguration configuration) { + Map collectConfigFilesContents(InstanceConfiguration configuration) { return ['pig.properties' : configuration.getSettingsContainer().flattenFile('pig.properties')] } @@ -122,7 +124,7 @@ class PigServiceDescriptor implements ServiceDescriptor { } @Override - String httpUri(InstanceConfiguration configuration, Map> configFileContents) { + String httpUri(InstanceConfiguration configuration, Map configFileContents) { if (GATEWAY.equals(configuration.roleDescriptor)) { return null } diff --git a/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/fixture/hadoop/services/SparkYarnServiceDescriptor.groovy b/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/fixture/hadoop/services/SparkYarnServiceDescriptor.groovy index 951aef134..170aa91fd 100644 --- a/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/fixture/hadoop/services/SparkYarnServiceDescriptor.groovy +++ b/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/fixture/hadoop/services/SparkYarnServiceDescriptor.groovy @@ -29,12 +29,15 @@ import org.elasticsearch.hadoop.gradle.fixture.hadoop.conf.ServiceConfiguration import org.elasticsearch.hadoop.gradle.tasks.ApacheMirrorDownload import org.gradle.api.GradleException +import static org.elasticsearch.hadoop.gradle.fixture.hadoop.conf.SettingsContainer.FileSettings + class SparkYarnServiceDescriptor implements ServiceDescriptor { + static final Version VERSION = new Version(2, 3, 4) static final Map> VERSION_MAP = [:] static { - VERSION_MAP.put(new Version(2, 3, 3), - ['SHA-512': '27CF9AD268E684D6926201BB5478F7F5A410659972BC79FC14AF61245EFF50C9A4363E400311B2FA9E1326E8AF1EB6DDE1D359B88B9143F25A49B3E11596B002']) + VERSION_MAP.put(VERSION, + ['SHA-512': '9FBEFCE2739990FFEDE6968A9C2F3FE399430556163BFDABDF5737A8F9E52CD535489F5CA7D641039A87700F50BFD91A706CA47979EE51A3A18787A92E2D6D53']) } static RoleDescriptor GATEWAY = RoleDescriptor.requiredGateway('spark', []) @@ -61,7 +64,7 @@ class SparkYarnServiceDescriptor implements ServiceDescriptor { @Override Version defaultVersion() { - return new Version(2, 3, 3) + return VERSION } @Override @@ -116,7 +119,7 @@ class SparkYarnServiceDescriptor implements ServiceDescriptor { } @Override - Map> collectConfigFilesContents(InstanceConfiguration configuration) { + Map collectConfigFilesContents(InstanceConfiguration configuration) { return ['spark-defaults.conf' : configuration.getSettingsContainer().flattenFile('spark-defaults.conf')] } @@ -126,7 +129,7 @@ class SparkYarnServiceDescriptor implements ServiceDescriptor { } @Override - String httpUri(InstanceConfiguration configuration, Map> configFileContents) { + String httpUri(InstanceConfiguration configuration, Map configFileContents) { if (GATEWAY.equals(configuration.roleDescriptor)) { return null } diff --git a/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/fixture/hadoop/tasks/AbstractClusterTask.groovy b/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/fixture/hadoop/tasks/AbstractClusterTask.groovy index 6e16924b7..c6534b06b 100644 --- a/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/fixture/hadoop/tasks/AbstractClusterTask.groovy +++ b/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/fixture/hadoop/tasks/AbstractClusterTask.groovy @@ -19,14 +19,15 @@ package org.elasticsearch.hadoop.gradle.fixture.hadoop.tasks +import org.elasticsearch.gradle.testclusters.DefaultTestClustersTask import org.elasticsearch.hadoop.gradle.fixture.hadoop.conf.HadoopClusterConfiguration import org.elasticsearch.hadoop.gradle.fixture.hadoop.conf.InstanceConfiguration -import org.gradle.api.DefaultTask -abstract class AbstractClusterTask extends DefaultTask { +abstract class AbstractClusterTask extends DefaultTestClustersTask { HadoopClusterConfiguration clusterConfiguration InstanceConfiguration executedOn + Map environmentVariables = [:] AbstractClusterTask() { super() @@ -37,4 +38,34 @@ abstract class AbstractClusterTask extends DefaultTask { executedOn = instance } + abstract InstanceConfiguration defaultInstance(HadoopClusterConfiguration clusterConfiguration) + abstract Map taskEnvironmentVariables() + + protected getInstance() { + return executedOn == null ? defaultInstance(this.clusterConfiguration) : executedOn + } + + protected Map collectEnvVars() { + InstanceConfiguration instance = getInstance() + + Map finalEnv = [:] + + // Set JAVA_HOME + finalEnv['JAVA_HOME'] = instance.javaHome + + // User provided environment variables from the cluster configuration + finalEnv.putAll(instance.getEnvironmentVariables()) + + // Finalize the environment variables using the service descriptor + instance.getServiceDescriptor().finalizeEnv(finalEnv, instance) + + // Add any environment variables that might be based on the specific + // task's configuration (jvm options via env, lib jars, etc...) + finalEnv.putAll(taskEnvironmentVariables()) + + // Add the explicit env variables from this task instance at the end + finalEnv.putAll(environmentVariables) + + return finalEnv + } } diff --git a/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/fixture/hadoop/tasks/DfsCopy.groovy b/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/fixture/hadoop/tasks/DfsCopy.groovy index 90dba13ba..bcba37091 100644 --- a/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/fixture/hadoop/tasks/DfsCopy.groovy +++ b/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/fixture/hadoop/tasks/DfsCopy.groovy @@ -33,7 +33,6 @@ class DfsCopy extends AbstractClusterTask { File localSource File dfsDestination File localDestination - Map env = [:] File getDfsSource() { return dfsSource @@ -95,20 +94,17 @@ class DfsCopy extends AbstractClusterTask { setLocalDestination(path) } - Map getEnv() { - return env - } - - void setEnv(Map env) { - this.env = env - } - - void env(String key, String val) { - env.put(key, val) + @Override + InstanceConfiguration defaultInstance(HadoopClusterConfiguration clusterConfiguration) { + return clusterConfiguration + .service(HadoopClusterConfiguration.HADOOP) + .role(HadoopServiceDescriptor.GATEWAY) + .instance(0) } - void env(Map values) { - env.putAll(values) + @Override + Map taskEnvironmentVariables() { + return [:] } @TaskAction @@ -125,10 +121,7 @@ class DfsCopy extends AbstractClusterTask { } // Gateway conf - InstanceConfiguration hadoopGateway = clusterConfiguration - .service(HadoopClusterConfiguration.HADOOP) - .role(HadoopServiceDescriptor.GATEWAY) - .instance(0) + InstanceConfiguration hadoopGateway = getInstance() // Determine command File baseDir = hadoopGateway.getBaseDir() @@ -153,9 +146,7 @@ class DfsCopy extends AbstractClusterTask { } // Combine env and sysprops - Map finalEnv = hadoopGateway.getEnvironmentVariables() - hadoopGateway.getServiceDescriptor().finalizeEnv(finalEnv, hadoopGateway) - finalEnv.putAll(env) + Map finalEnv = collectEnvVars() // First ensure destination directories exist if (dfsDestination != null) { diff --git a/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/fixture/hadoop/tasks/HadoopMRJob.groovy b/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/fixture/hadoop/tasks/HadoopMRJob.groovy index 186a8d263..59f73062e 100644 --- a/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/fixture/hadoop/tasks/HadoopMRJob.groovy +++ b/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/fixture/hadoop/tasks/HadoopMRJob.groovy @@ -26,24 +26,29 @@ import org.gradle.api.GradleException import org.gradle.api.tasks.TaskAction import org.gradle.process.ExecSpec +import static org.elasticsearch.hadoop.gradle.util.ObjectUtil.unapplyString + class HadoopMRJob extends AbstractClusterTask { String jobClass File jobJar - Map jobSettings = [:] + Map jobSettings = [:] List libJars = [] List args = [] Map systemProperties = [:] - Map environmentVariables = [:] - void jobSetting(String key, String value) { + void jobSetting(String key, Object value) { jobSettings.put(key, value) } - void jobSettings(Map settings) { + void jobSettings(Map settings) { jobSettings.putAll(settings) } + void libJars(File... files) { + libJars.addAll(files) + } + void systemProperty(String key, String value) { systemProperties.put(key, value) } @@ -52,6 +57,50 @@ class HadoopMRJob extends AbstractClusterTask { systemProperties.putAll(settings) } + @Override + InstanceConfiguration defaultInstance(HadoopClusterConfiguration clusterConfiguration) { + return clusterConfiguration + .service(HadoopClusterConfiguration.HADOOP) + .role(HadoopServiceDescriptor.GATEWAY) + .instance(0) + } + + @Override + Map taskEnvironmentVariables() { + Map taskEnv = [:] + InstanceConfiguration instance = getInstance() + + // Add lib jars to the user classpath if needed + if (!libJars.isEmpty()) { + taskEnv.put('YARN_USER_CLASSPATH', libJars.join(":")) + } + + // Collect the java properties and JVM options from the cluster configuration, as well as the system props from + // this specific task. Add them to the environment variable that Hadoop would expect them to be specified under. + String javaPropertyEnvVariable = instance.getServiceDescriptor().javaOptsEnvSetting(instance) + if (javaPropertyEnvVariable != null) { + List javaOpts = [taskEnv.get(javaPropertyEnvVariable, '')] + javaOpts.add(instance.getJvmArgs()) + for (Map propertyMap : [instance.getSystemProperties(), systemProperties]) { + String collectedSystemProperties = propertyMap + .collect { key, value -> "-D${key}=${value}" } + .join(" ") + if (!collectedSystemProperties.isEmpty()) { + javaOpts.add(collectedSystemProperties) + } + } + // Force the javaOptsEnvSetting to be the correct one for executing a hadoop jar command. + // We might be running this command "on a different instance" than gateway + // (it might be running on namenode, and be using namenode's env property names) + String hadoopJarCommandJavaPropertyEnvVariable = instance + .getServiceDescriptor() + .javaOptsEnvSetting(defaultInstance(clusterConfiguration)) + taskEnv.put(hadoopJarCommandJavaPropertyEnvVariable, javaOpts.join(" ").trim()) + } + + return taskEnv + } + @TaskAction void runYarnJar() { // Verification @@ -85,35 +134,13 @@ class HadoopMRJob extends AbstractClusterTask { commandLine.addAll(['-libjars', libJars.join(',')]) } if (!jobSettings.isEmpty()) { - commandLine.addAll(jobSettings.collect { k, v -> "-D${k}=${v}"}) + commandLine.addAll(jobSettings.collect { k, v -> "-D${k}=${unapplyString(v)}"}) } if (!args.isEmpty()) { commandLine.addAll(args) } - Map finalEnv = hadoopGateway.getEnvironmentVariables() - hadoopGateway.getServiceDescriptor().finalizeEnv(finalEnv, hadoopGateway) - - if (!libJars.isEmpty()) { - finalEnv.put('YARN_USER_CLASSPATH', libJars.join(":")) - } - - String javaPropertyEnvVariable = hadoopGateway.getServiceDescriptor().javaOptsEnvSetting(hadoopGateway) - if (javaPropertyEnvVariable != null) { - List javaOpts = [finalEnv.get(javaPropertyEnvVariable, '')] - javaOpts.add(hadoopGateway.getJvmArgs()) - for (Map propertyMap : [hadoopGateway.getSystemProperties(), systemProperties]) { - String collectedSystemProperties = propertyMap - .collect { key, value -> "-D${key}=${value}" } - .join(" ") - if (!collectedSystemProperties.isEmpty()) { - javaOpts.add(collectedSystemProperties) - } - } - finalEnv.put('YARN_OPTS', javaOpts.join(" ")) - } - - finalEnv.putAll(environmentVariables) + Map finalEnv = collectEnvVars() // Do command project.logger.info("Executing Command: " + commandLine) diff --git a/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/fixture/hadoop/tasks/HiveBeeline.groovy b/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/fixture/hadoop/tasks/HiveBeeline.groovy index 1dce657e2..364030296 100644 --- a/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/fixture/hadoop/tasks/HiveBeeline.groovy +++ b/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/fixture/hadoop/tasks/HiveBeeline.groovy @@ -26,12 +26,30 @@ import org.gradle.api.GradleException import org.gradle.api.tasks.TaskAction import org.gradle.process.ExecSpec +import static org.elasticsearch.hadoop.gradle.fixture.hadoop.conf.SettingsContainer.FileSettings + class HiveBeeline extends AbstractClusterTask { File script List libJars = [] String hivePrincipal - Map env = [:] + + void libJars(File... files) { + libJars.addAll(files) + } + + @Override + InstanceConfiguration defaultInstance(HadoopClusterConfiguration clusterConfiguration) { + return clusterConfiguration + .service(HadoopClusterConfiguration.HIVE) + .role(HiveServiceDescriptor.HIVESERVER) + .instance(0) + } + + @Override + Map taskEnvironmentVariables() { + return [:] + } @TaskAction void runBeeline() { @@ -41,10 +59,7 @@ class HiveBeeline extends AbstractClusterTask { } // Gateway conf - InstanceConfiguration hiveServer = clusterConfiguration - .service(HadoopClusterConfiguration.HIVE) - .role(HiveServiceDescriptor.HIVESERVER) - .instance(0) + InstanceConfiguration hiveServer = getInstance() File baseDir = hiveServer.getBaseDir() File homeDir = new File(baseDir, hiveServer.getServiceDescriptor().homeDirName(hiveServer)) @@ -63,10 +78,7 @@ class HiveBeeline extends AbstractClusterTask { commandLine.addAll(['-f', finalScript.toString()]) } - // Use the service descriptor to pick up HADOOP_HOME= - Map environment = hiveServer.getEnvironmentVariables() - hiveServer.getServiceDescriptor().finalizeEnv(environment, hiveServer) - environment.putAll(env) + Map environment = collectEnvVars() project.logger.info("Using Environment: $environment") project.exec { ExecSpec spec -> @@ -76,7 +88,7 @@ class HiveBeeline extends AbstractClusterTask { } static String getConnectionString(InstanceConfiguration hiveServer, String hivePrincipal) { - Map hiveconf = hiveServer + FileSettings hiveconf = hiveServer .getServiceDescriptor() .collectConfigFilesContents(hiveServer) .get('hive-site.xml') diff --git a/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/fixture/hadoop/tasks/PigScript.groovy b/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/fixture/hadoop/tasks/PigScript.groovy index e3c5ce082..bda002b80 100644 --- a/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/fixture/hadoop/tasks/PigScript.groovy +++ b/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/fixture/hadoop/tasks/PigScript.groovy @@ -30,7 +30,23 @@ class PigScript extends AbstractClusterTask { File script List libJars = [] - Map env = [:] + + void libJars(File... files) { + libJars.addAll(files) + } + + @Override + InstanceConfiguration defaultInstance(HadoopClusterConfiguration clusterConfiguration) { + return clusterConfiguration + .service(HadoopClusterConfiguration.PIG) + .role(PigServiceDescriptor.GATEWAY) + .instance(0) + } + + @Override + Map taskEnvironmentVariables() { + return [:] + } @TaskAction void runPig() { @@ -43,10 +59,7 @@ class PigScript extends AbstractClusterTask { } // Gateway conf - InstanceConfiguration pigGateway = clusterConfiguration - .service(HadoopClusterConfiguration.PIG) - .role(PigServiceDescriptor.GATEWAY) - .instance(0) + InstanceConfiguration pigGateway = getInstance() File baseDir = pigGateway.getBaseDir() File homeDir = new File(baseDir, pigGateway.getServiceDescriptor().homeDirName(pigGateway)) @@ -67,9 +80,7 @@ class PigScript extends AbstractClusterTask { } // Use the service descriptor to pick up HADOOP_HOME= - Map environment = pigGateway.getEnvironmentVariables() - pigGateway.getServiceDescriptor().finalizeEnv(environment, pigGateway) - environment.putAll(env) + Map environment = collectEnvVars() // Additional env's // PIG_HEAPSIZE - In MB diff --git a/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/fixture/hadoop/tasks/SparkApp.groovy b/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/fixture/hadoop/tasks/SparkApp.groovy index e80d81cfb..7b82c40a6 100644 --- a/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/fixture/hadoop/tasks/SparkApp.groovy +++ b/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/fixture/hadoop/tasks/SparkApp.groovy @@ -26,6 +26,8 @@ import org.gradle.api.GradleException import org.gradle.api.tasks.TaskAction import org.gradle.process.ExecSpec +import static org.elasticsearch.hadoop.gradle.util.ObjectUtil.unapplyString + class SparkApp extends AbstractClusterTask { enum Master { @@ -40,12 +42,11 @@ class SparkApp extends AbstractClusterTask { File jobJar Master master = Master.YARN DeployMode deployMode = DeployMode.CLIENT - Map jobSettings = [:] + Map jobSettings = [:] String principal String keytab List libJars = [] List args = [] - Map env = [:] void deployMode(DeployMode mode) { deployMode = mode @@ -59,14 +60,31 @@ class SparkApp extends AbstractClusterTask { deployMode = DeployMode.CLUSTER } - void jobSetting(String key, String value) { + void jobSetting(String key, Object value) { jobSettings.put(key, value) } - void jobSettings(Map configs) { + void jobSettings(Map configs) { jobSettings.putAll(configs) } + void libJars(File... files) { + libJars.addAll(files) + } + + @Override + InstanceConfiguration defaultInstance(HadoopClusterConfiguration clusterConfiguration) { + return clusterConfiguration + .service(HadoopClusterConfiguration.SPARK) + .role(SparkYarnServiceDescriptor.GATEWAY) + .instance(0) + } + + @Override + Map taskEnvironmentVariables() { + return [:] + } + @TaskAction void runSparkSubmit() { //Verification @@ -81,10 +99,7 @@ class SparkApp extends AbstractClusterTask { } // Gateway conf - InstanceConfiguration sparkGateway = clusterConfiguration - .service(HadoopClusterConfiguration.SPARK) - .role(SparkYarnServiceDescriptor.GATEWAY) - .instance(0) + InstanceConfiguration sparkGateway = getInstance() File baseDir = sparkGateway.getBaseDir() File homeDir = new File(baseDir, sparkGateway.getServiceDescriptor().homeDirName(sparkGateway)) @@ -113,7 +128,7 @@ class SparkApp extends AbstractClusterTask { commandLine.addAll(['--jars', libJars.join(',')]) } - jobSettings.collect { k, v -> /$k=$v/ }.forEach { conf -> commandLine.add('--conf'); commandLine.add(conf) } + jobSettings.collect { k, v -> /$k=${unapplyString(v)}/ }.forEach { conf -> commandLine.add('--conf'); commandLine.add(conf) } if (DeployMode.CLUSTER.equals(deployMode) && (principal != null || keytab != null)) { if (principal == null || keytab == null) { @@ -126,9 +141,7 @@ class SparkApp extends AbstractClusterTask { commandLine.addAll(args) // HADOOP_CONF_DIR=..../etc/hadoop - Map finalEnv = sparkGateway.getEnvironmentVariables() - sparkGateway.getServiceDescriptor().finalizeEnv(finalEnv, sparkGateway) - finalEnv.putAll(env) + Map finalEnv = collectEnvVars() // Do command project.logger.info("Command Env: " + finalEnv) diff --git a/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/util/ObjectUtil.groovy b/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/util/ObjectUtil.groovy new file mode 100644 index 000000000..cad9f57a6 --- /dev/null +++ b/buildSrc/src/main/groovy/org/elasticsearch/hadoop/gradle/util/ObjectUtil.groovy @@ -0,0 +1,34 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.hadoop.gradle.util + +import java.util.concurrent.Callable + +class ObjectUtil { + static String unapplyString(Object value) { + if (value == null) { + return null + } else if (value instanceof Callable) { + return ((Closure) value).call().toString() + } else { + return value.toString() + } + } +} diff --git a/qa/kerberos/build.gradle b/qa/kerberos/build.gradle index 4d7097dc7..b18c2d74a 100644 --- a/qa/kerberos/build.gradle +++ b/qa/kerberos/build.gradle @@ -151,7 +151,7 @@ if (disableTests) { // Configure MiniKDC AntFixture kdcFixture = project.tasks.create('kdcFixture', AntFixture) { dependsOn project.configurations.kdcFixture - executable = new File(project.compilerJavaHome, 'bin/java') + executable = new File(project.runtimeJavaHome, 'bin/java') env 'CLASSPATH', "${ -> project.configurations.kdcFixture.asPath }" waitCondition = { fixture, ant -> // the kdc wrapper writes the ports file when @@ -218,7 +218,8 @@ if (disableTests) { extraConfigFile('es.keytab', esKeytab.toFile()) } - + def esAddress = "${-> testClusters.integTest.getAllHttpSocketURI().get(0)}" + // Configure Integration Test Task Test integrationTest = project.tasks.findByName('integrationTest') as Test integrationTest.dependsOn(kdcFixture) @@ -242,9 +243,10 @@ if (disableTests) { useCluster(testClusters.integTest) doLast { project.javaexec { + executable = project.runtimeJavaHome.toString() + "/bin/java" main = 'org.elasticsearch.hadoop.qa.kerberos.setup.SetupKerberosUsers' classpath = sourceSets.main.runtimeClasspath - systemProperty('es.nodes', 'localhost:9500') + systemProperty('es.nodes', esAddress) systemProperty('es.net.http.auth.user', 'test_admin') systemProperty('es.net.http.auth.pass', 'x-pack-test-password') systemProperty('principals', "$clientPrincipal$realm") @@ -265,6 +267,7 @@ if (disableTests) { // Hadoop cluster depends on KDC Fixture being up and running config.addDependency(kdcFixture) + config.useElasticsearchCluster(testClusters.integTest) config.service('hadoop') { ServiceConfiguration s -> s.addSystemProperty("java.security.krb5.conf", krb5Conf.toString()) @@ -330,7 +333,7 @@ if (disableTests) { r.addEnvironmentVariable('YARN_USER_CLASSPATH', testingJar.archivePath.toString()) r.settingsFile('yarn-site.xml') { SettingsContainer.FileSettings f -> // Add settings specifically for ES Node to allow for cancelling the tokens - f.addSetting('es.nodes', 'localhost:9500') + f.addSetting('es.nodes', esAddress) } } } @@ -344,6 +347,7 @@ if (disableTests) { s.addSetting('yarn.app.mapreduce.am.command-opts', "-Djava.security.krb5.conf=${krb5Conf.toString()}") s.addSetting('mapreduce.map.java.opts', "-Djava.security.krb5.conf=${krb5Conf.toString()}") s.addSetting('mapreduce.reduce.java.opts', "-Djava.security.krb5.conf=${krb5Conf.toString()}") + s.addSetting('es.nodes', esAddress) } config.service('pig') { ServiceConfiguration s -> s.addSetting('java.security.krb5.conf', krb5Conf.toString()) @@ -352,6 +356,7 @@ if (disableTests) { s.addSetting('yarn.app.mapreduce.am.command-opts', "-Djava.security.krb5.conf=${krb5Conf.toString()}") s.addSetting('mapreduce.map.java.opts', "-Djava.security.krb5.conf=${krb5Conf.toString()}") s.addSetting('mapreduce.reduce.java.opts', "-Djava.security.krb5.conf=${krb5Conf.toString()}") + s.addSetting('es.nodes', esAddress) } config.addDependency(jar) config.addDependency(testingJar) @@ -417,7 +422,6 @@ if (disableTests) { "test.krb5.keytab": clientKeytab.toString(), "java.security.krb5.conf": krb5Conf.toString() ]) - environmentVariables.put('HADOOP_ROOT_LOGGER','TRACE,console') args = ['-copyFromLocal', project.file(new File(mrItestResourceDir, 'artists.dat')), "/data/artists/artists.dat"] } @@ -428,20 +432,21 @@ if (disableTests) { // Run the MR job to load data to ES. Ensure Kerberos settings are available. HadoopMRJob mrLoadData = config.createClusterTask('mrLoadData', HadoopMRJob.class) { clusterConfiguration = config + useCluster(testClusters.integTest) dependsOn(copyData, setupUsers) jobJar = jar.archivePath - libJars.add(testingJar.archivePath) + libJars(testingJar.archivePath) jobClass = 'org.elasticsearch.hadoop.qa.kerberos.mr.LoadToES' - jobSettings = [ + jobSettings([ 'es.resource': 'qa_kerberos_mr_data', - 'es.nodes': 'localhost:9500', + 'es.nodes': esAddress, 'es.security.authentication': 'kerberos', 'es.net.spnego.auth.elasticsearch.principal': "${esPrincipal}${realm}", 'load.field.names': 'number,name,url,picture,@timestamp,tag', 'mapreduce.map.java.opts': "-Djava.security.krb5.conf=${krb5Conf.toString()}", 'mapreduce.reduce.java.opts': "-Djava.security.krb5.conf=${krb5Conf.toString()}", 'yarn.app.mapreduce.am.command-opts': "-Djava.security.krb5.conf=${krb5Conf.toString()}" - ] + ]) systemProperties([ "test.krb5.principal": clientPrincipal, "test.krb5.keytab": clientKeytab.toString(), @@ -454,19 +459,20 @@ if (disableTests) { // Run the MR job to read data out of ES. Ensure Kerberos settings are available. HadoopMRJob mrReadData = config.createClusterTask('mrReadData', HadoopMRJob.class) { clusterConfiguration = config + useCluster(testClusters.integTest) dependsOn(mrLoadData) jobJar = jar.archivePath - libJars.add(testingJar.archivePath) + libJars(testingJar.archivePath) jobClass = 'org.elasticsearch.hadoop.qa.kerberos.mr.ReadFromES' - jobSettings = [ + jobSettings([ 'es.resource': 'qa_kerberos_mr_data', - 'es.nodes': 'localhost:9500', + 'es.nodes': esAddress, 'es.security.authentication': 'kerberos', 'es.net.spnego.auth.elasticsearch.principal': "${esPrincipal}${realm}", 'mapreduce.map.java.opts': "-Djava.security.krb5.conf=${krb5Conf.toString()}", 'mapreduce.reduce.java.opts': "-Djava.security.krb5.conf=${krb5Conf.toString()}", 'yarn.app.mapreduce.am.command-opts': "-Djava.security.krb5.conf=${krb5Conf.toString()}" - ] + ]) systemProperties([ "test.krb5.principal": clientPrincipal, "test.krb5.keytab": clientKeytab.toString(), @@ -483,16 +489,17 @@ if (disableTests) { // Run the Spark job to load data to ES. Ensure Kerberos settings are available. SparkApp sparkLoadData = config.createClusterTask('sparkLoadData', SparkApp.class) { clusterConfiguration = config + useCluster(testClusters.integTest) dependsOn(copyData) // deployModeCluster() // principal = clientPrincipal + realm // keytab = clientKeytab.toString() jobJar = jar.archivePath - libJars.add(testingJar.archivePath) + libJars(testingJar.archivePath) jobClass = 'org.elasticsearch.hadoop.qa.kerberos.spark.LoadToES' jobSettings([ 'spark.es.resource': 'qa_kerberos_spark_data', - 'spark.es.nodes': 'localhost:9500', + 'spark.es.nodes': esAddress, 'spark.es.security.authentication': 'kerberos', 'spark.es.net.spnego.auth.elasticsearch.principal': "${esPrincipal}${realm}", 'spark.load.field.names': 'number,name,url,picture,@timestamp,tag', @@ -500,7 +507,7 @@ if (disableTests) { 'spark.driver.extraJavaOptions': "-Djava.security.krb5.conf=${krb5Conf.toString()}", 'spark.executor.extraJavaOptions': "-Djava.security.krb5.conf=${krb5Conf.toString()}" ]) - env.put('SPARK_SUBMIT_OPTS', "-Djava.security.krb5.conf=${krb5Conf.toString()} " + + environmentVariables.put('SPARK_SUBMIT_OPTS', "-Djava.security.krb5.conf=${krb5Conf.toString()} " + "-Dtest.krb5.principal=$clientPrincipal$realm " + "-Dtest.krb5.keytab=${clientKeytab.toString()}") args = ['/data/artists/artists.dat'] @@ -510,23 +517,24 @@ if (disableTests) { // Run the Spark job to load data to ES. Ensure Kerberos settings are available. SparkApp sparkReadData = config.createClusterTask('sparkReadData', SparkApp.class) { clusterConfiguration = config + useCluster(testClusters.integTest) dependsOn(sparkLoadData) // deployModeCluster() // principal = clientPrincipal + realm // keytab = clientKeytab.toString() jobJar = jar.archivePath - libJars.add(testingJar.archivePath) + libJars(testingJar.archivePath) jobClass = 'org.elasticsearch.hadoop.qa.kerberos.spark.ReadFromES' jobSettings([ 'spark.es.resource': 'qa_kerberos_spark_data', - 'spark.es.nodes': 'localhost:9500', + 'spark.es.nodes': esAddress, 'spark.es.security.authentication': 'kerberos', 'spark.es.net.spnego.auth.elasticsearch.principal': "${esPrincipal}${realm}", 'spark.yarn.am.extraJavaOptions': "-Djava.security.krb5.conf=${krb5Conf.toString()}", 'spark.driver.extraJavaOptions': "-Djava.security.krb5.conf=${krb5Conf.toString()}", 'spark.executor.extraJavaOptions': "-Djava.security.krb5.conf=${krb5Conf.toString()}" ]) - env.put('SPARK_SUBMIT_OPTS', "-Djava.security.krb5.conf=${krb5Conf.toString()} " + + environmentVariables.put('SPARK_SUBMIT_OPTS', "-Djava.security.krb5.conf=${krb5Conf.toString()} " + "-Dtest.krb5.principal=$clientPrincipal$realm " + "-Dtest.krb5.keytab=${clientKeytab.toString()}") args = ['/data/output/spark'] @@ -553,11 +561,12 @@ if (disableTests) { HiveBeeline hiveLoadData = config.createClusterTask('hiveLoadData', HiveBeeline.class) { clusterConfiguration = config + useCluster(testClusters.integTest) dependsOn(jar, setupUsers, copyData, patchBeeline) hivePrincipal = hivePrincipalName + realm script = new File(resourceDir, 'hive/load_to_es.sql') - libJars.add(testingJar.archivePath) - env.putAll([ + libJars(testingJar.archivePath) + environmentVariables.putAll([ 'HADOOP_CLIENT_OPTS': "-Djava.security.krb5.conf=${krb5Conf.toString()} " + "-Dtest.krb5.principal=$clientPrincipal$realm " + @@ -569,11 +578,12 @@ if (disableTests) { HiveBeeline hiveReadData = config.createClusterTask('hiveReadData', HiveBeeline.class) { clusterConfiguration = config + useCluster(testClusters.integTest) dependsOn(hiveLoadData) hivePrincipal = hivePrincipalName + realm script = new File(resourceDir, 'hive/read_from_es.sql') - libJars.add(testingJar.archivePath) - env.putAll([ + libJars(testingJar.archivePath) + environmentVariables.putAll([ 'HADOOP_CLIENT_OPTS': "-Djava.security.krb5.conf=${krb5Conf.toString()} " + "-Dtest.krb5.principal=$clientPrincipal$realm " + @@ -589,23 +599,25 @@ if (disableTests) { PigScript pigLoadData = config.createClusterTask('pigLoadData', PigScript.class) { clusterConfiguration = config + useCluster(testClusters.integTest) dependsOn(jar, setupUsers, copyData) script = new File(resourceDir, 'pig/load_to_es.pig') - libJars.add(testingJar.archivePath) - env = [ + libJars(testingJar.archivePath) + environmentVariables.putAll([ 'PIG_OPTS': "-Djava.security.krb5.conf=${krb5Conf.toString()}" - ] + ]) } integrationTest.dependsOn(pigLoadData) PigScript pigReadData = config.createClusterTask('pigReadData', PigScript.class) { clusterConfiguration = config + useCluster(testClusters.integTest) dependsOn(pigLoadData) script = new File(resourceDir, 'pig/read_from_es.pig') - libJars.add(testingJar.archivePath) - env = [ + libJars(testingJar.archivePath) + environmentVariables.putAll([ 'PIG_OPTS': "-Djava.security.krb5.conf=${krb5Conf.toString()}" - ] + ]) } integrationTest.dependsOn(pigReadData) @@ -644,7 +656,6 @@ if (disableTests) { "test.krb5.keytab": clientKeytab.toString(), "java.security.krb5.conf": krb5Conf.toString() ]) - environmentVariables.put('HADOOP_ROOT_LOGGER','TRACE,console') args = ['-copyToLocal', "/data/output/$integrationName", outputDataDir] } // Integration test needs to depend on copy output tasks diff --git a/qa/kerberos/src/main/java/org/elasticsearch/hadoop/qa/kerberos/setup/SetupKerberosUsers.java b/qa/kerberos/src/main/java/org/elasticsearch/hadoop/qa/kerberos/setup/SetupKerberosUsers.java index 8f303ab55..6840f4d0e 100644 --- a/qa/kerberos/src/main/java/org/elasticsearch/hadoop/qa/kerberos/setup/SetupKerberosUsers.java +++ b/qa/kerberos/src/main/java/org/elasticsearch/hadoop/qa/kerberos/setup/SetupKerberosUsers.java @@ -19,8 +19,6 @@ package org.elasticsearch.hadoop.qa.kerberos.setup; -import java.util.Properties; - import org.apache.commons.logging.LogFactory; import org.elasticsearch.hadoop.cfg.PropertiesSettings; import org.elasticsearch.hadoop.cfg.Settings; diff --git a/qa/kerberos/src/main/java/org/elasticsearch/hadoop/qa/kerberos/storm/StreamFromEs.java b/qa/kerberos/src/main/java/org/elasticsearch/hadoop/qa/kerberos/storm/StreamFromEs.java index 4efa6059f..1a3c4b570 100644 --- a/qa/kerberos/src/main/java/org/elasticsearch/hadoop/qa/kerberos/storm/StreamFromEs.java +++ b/qa/kerberos/src/main/java/org/elasticsearch/hadoop/qa/kerberos/storm/StreamFromEs.java @@ -37,12 +37,13 @@ public class StreamFromEs { public static void main(String[] args) throws Exception { final String submitPrincipal = args[0]; final String submitKeytab = args[1]; + final String esNodes = args[2]; LoginContext loginContext = LoginUtil.keytabLogin(submitPrincipal, submitKeytab); try { Subject.doAs(loginContext.getSubject(), new PrivilegedExceptionAction() { @Override public Void run() throws Exception { - submitJob(submitPrincipal, submitKeytab); + submitJob(submitPrincipal, submitKeytab, esNodes); return null; } }); @@ -51,7 +52,7 @@ public Void run() throws Exception { } } - public static void submitJob(String principal, String keytab) throws Exception { + public static void submitJob(String principal, String keytab, String esNodes) throws Exception { TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("ES", new EsSpout("storm-test")); builder.setBolt("Output", new CapturingBolt()).shuffleGrouping("ES"); @@ -62,7 +63,7 @@ public static void submitJob(String principal, String keytab) throws Exception { List plugins = new ArrayList(); plugins.add(AutoElasticsearch.class.getName()); conf.put(Config.TOPOLOGY_AUTO_CREDENTIALS, plugins); - conf.put(ConfigurationOptions.ES_PORT, "9500"); + conf.put(ConfigurationOptions.ES_NODES, esNodes); conf.put(ConfigurationOptions.ES_SECURITY_AUTHENTICATION, "kerberos"); conf.put(ConfigurationOptions.ES_NET_SPNEGO_AUTH_ELASTICSEARCH_PRINCIPAL, "HTTP/build.elastic.co@BUILD.ELASTIC.CO"); conf.put(ConfigurationOptions.ES_INPUT_JSON, "true"); diff --git a/qa/kerberos/src/main/java/org/elasticsearch/hadoop/qa/kerberos/storm/StreamToEs.java b/qa/kerberos/src/main/java/org/elasticsearch/hadoop/qa/kerberos/storm/StreamToEs.java index 19f004eaa..025706a80 100644 --- a/qa/kerberos/src/main/java/org/elasticsearch/hadoop/qa/kerberos/storm/StreamToEs.java +++ b/qa/kerberos/src/main/java/org/elasticsearch/hadoop/qa/kerberos/storm/StreamToEs.java @@ -41,12 +41,13 @@ public class StreamToEs { public static void main(String[] args) throws Exception { final String submitPrincipal = args[0]; final String submitKeytab = args[1]; + final String esNodes = args[2]; LoginContext loginContext = LoginUtil.keytabLogin(submitPrincipal, submitKeytab); try { Subject.doAs(loginContext.getSubject(), new PrivilegedExceptionAction() { @Override public Void run() throws Exception { - submitJob(submitPrincipal, submitKeytab); + submitJob(submitPrincipal, submitKeytab, esNodes); return null; } }); @@ -55,7 +56,7 @@ public Void run() throws Exception { } } - public static void submitJob(String principal, String keytab) throws Exception { + public static void submitJob(String principal, String keytab, String esNodes) throws Exception { List doc1 = Collections.singletonList("{\"reason\" : \"business\",\"airport\" : \"SFO\"}"); List doc2 = Collections.singletonList("{\"participants\" : 5,\"airport\" : \"OTP\"}"); @@ -71,7 +72,7 @@ public static void submitJob(String principal, String keytab) throws Exception { List plugins = new ArrayList(); plugins.add(AutoElasticsearch.class.getName()); conf.put(Config.TOPOLOGY_AUTO_CREDENTIALS, plugins); - conf.put(ConfigurationOptions.ES_PORT, "9500"); + conf.put(ConfigurationOptions.ES_NODES, esNodes); conf.put(ConfigurationOptions.ES_SECURITY_AUTHENTICATION, "kerberos"); conf.put(ConfigurationOptions.ES_NET_SPNEGO_AUTH_ELASTICSEARCH_PRINCIPAL, "HTTP/build.elastic.co@BUILD.ELASTIC.CO"); conf.put(ConfigurationOptions.ES_INPUT_JSON, "true"); diff --git a/qa/kerberos/src/main/resources/hive/load_to_es.sql b/qa/kerberos/src/main/resources/hive/load_to_es.sql index 51b6519f0..f4d485399 100644 --- a/qa/kerberos/src/main/resources/hive/load_to_es.sql +++ b/qa/kerberos/src/main/resources/hive/load_to_es.sql @@ -26,7 +26,6 @@ CREATE EXTERNAL TABLE IF NOT EXISTS es_artist_data ( STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler' TBLPROPERTIES( 'es.resource' = 'qa_kerberos_hive_data', - 'es.nodes' = 'localhost:9500', 'es.security.authentication' = 'kerberos', 'es.net.spnego.auth.elasticsearch.principal' = 'HTTP/build.elastic.co@BUILD.ELASTIC.CO' ); diff --git a/qa/kerberos/src/main/resources/hive/read_from_es.sql b/qa/kerberos/src/main/resources/hive/read_from_es.sql index 0959b3e1d..280c4a833 100644 --- a/qa/kerberos/src/main/resources/hive/read_from_es.sql +++ b/qa/kerberos/src/main/resources/hive/read_from_es.sql @@ -8,7 +8,7 @@ CREATE EXTERNAL TABLE IF NOT EXISTS es_artist_data ( ts TIMESTAMP, tag STRING) STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler' -TBLPROPERTIES('es.resource' = 'qa_kerberos_hive_data', 'es.nodes' = 'localhost:9500'); +TBLPROPERTIES('es.resource' = 'qa_kerberos_hive_data'); DROP TABLE IF EXISTS artist_data; diff --git a/qa/kerberos/src/main/resources/pig/load_to_es.pig b/qa/kerberos/src/main/resources/pig/load_to_es.pig index 6aec1af48..425eac16f 100644 --- a/qa/kerberos/src/main/resources/pig/load_to_es.pig +++ b/qa/kerberos/src/main/resources/pig/load_to_es.pig @@ -1,7 +1,6 @@ A = LOAD '/data/artists' USING PigStorage('\t') AS (number: chararray, name: chararray, uri: chararray, picture: chararray, timestamp: chararray, tag: chararray); STORE A INTO 'qa_kerberos_pig_data' USING org.elasticsearch.hadoop.pig.EsStorage( - 'es.nodes = localhost:9500', 'es.security.authentication = kerberos', 'es.net.spnego.auth.elasticsearch.principal = HTTP/build.elastic.co@BUILD.ELASTIC.CO' ); \ No newline at end of file diff --git a/qa/kerberos/src/main/resources/pig/read_from_es.pig b/qa/kerberos/src/main/resources/pig/read_from_es.pig index 51cc74632..6617c8ded 100644 --- a/qa/kerberos/src/main/resources/pig/read_from_es.pig +++ b/qa/kerberos/src/main/resources/pig/read_from_es.pig @@ -1,5 +1,4 @@ A = LOAD 'qa_kerberos_pig_data' USING org.elasticsearch.hadoop.pig.EsStorage( - 'es.nodes = localhost:9500', 'es.security.authentication = kerberos', 'es.net.spnego.auth.elasticsearch.principal = HTTP/build.elastic.co@BUILD.ELASTIC.CO' ); diff --git a/test/fixtures/minikdc/build.gradle b/test/fixtures/minikdc/build.gradle index 1e3fa5117..3dcd6fc7a 100644 --- a/test/fixtures/minikdc/build.gradle +++ b/test/fixtures/minikdc/build.gradle @@ -31,6 +31,10 @@ dependencies { } } +// Target Java 1.8 compilation +sourceCompatibility = '1.8' +targetCompatibility = '1.8' + // for testing, until fixture are actually debuggable. // gradle hides EVERYTHING so you have no clue what went wrong. task kdc(type: JavaExec) {