Skip to content

Commit

Permalink
Merge pull request apache#353 from palantir/rk/upstream-bump
Browse files Browse the repository at this point in the history
Small upstream bump
  • Loading branch information
robert3005 authored Apr 4, 2018
2 parents 6da0b82 + 33658e4 commit 3bf8055
Show file tree
Hide file tree
Showing 49 changed files with 1,240 additions and 645 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,18 +33,21 @@ private[deploy] object DependencyUtils {
packagesExclusions: String,
packages: String,
repositories: String,
ivyRepoPath: String): String = {
ivyRepoPath: String,
ivySettingsPath: Option[String]): String = {
val exclusions: Seq[String] =
if (!StringUtils.isBlank(packagesExclusions)) {
packagesExclusions.split(",")
} else {
Nil
}
// Create the IvySettings, either load from file or build defaults
val ivySettings = sys.props.get("spark.jars.ivySettings").map { ivySettingsFile =>
SparkSubmitUtils.loadIvySettings(ivySettingsFile, Option(repositories), Option(ivyRepoPath))
}.getOrElse {
SparkSubmitUtils.buildIvySettings(Option(repositories), Option(ivyRepoPath))
val ivySettings = ivySettingsPath match {
case Some(path) =>
SparkSubmitUtils.loadIvySettings(path, Option(repositories), Option(ivyRepoPath))

case None =>
SparkSubmitUtils.buildIvySettings(Option(repositories), Option(ivyRepoPath))
}

SparkSubmitUtils.resolveMavenCoordinates(packages, ivySettings, exclusions = exclusions)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,8 @@ object SparkSubmit extends CommandLineUtils with Logging {
// Resolve maven dependencies if there are any and add classpath to jars. Add them to py-files
// too for packages that include Python code
val resolvedMavenCoordinates = DependencyUtils.resolveMavenDependencies(
args.packagesExclusions, args.packages, args.repositories, args.ivyRepoPath)
args.packagesExclusions, args.packages, args.repositories, args.ivyRepoPath,
args.ivySettingsPath)

if (!StringUtils.isBlank(resolvedMavenCoordinates)) {
args.jars = mergeFileLists(args.jars, resolvedMavenCoordinates)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
var packages: String = null
var repositories: String = null
var ivyRepoPath: String = null
var ivySettingsPath: Option[String] = None
var packagesExclusions: String = null
var verbose: Boolean = false
var isPython: Boolean = false
Expand Down Expand Up @@ -184,6 +185,7 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
jars = Option(jars).orElse(sparkProperties.get("spark.jars")).orNull
files = Option(files).orElse(sparkProperties.get("spark.files")).orNull
ivyRepoPath = sparkProperties.get("spark.jars.ivy").orNull
ivySettingsPath = sparkProperties.get("spark.jars.ivySettings")
packages = Option(packages).orElse(sparkProperties.get("spark.jars.packages")).orNull
packagesExclusions = Option(packagesExclusions)
.orElse(sparkProperties.get("spark.jars.excludes")).orNull
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,12 +79,17 @@ object DriverWrapper extends Logging {
val secMgr = new SecurityManager(sparkConf)
val hadoopConf = SparkHadoopUtil.newConfiguration(sparkConf)

val Seq(packagesExclusions, packages, repositories, ivyRepoPath) =
Seq("spark.jars.excludes", "spark.jars.packages", "spark.jars.repositories", "spark.jars.ivy")
.map(sys.props.get(_).orNull)
val Seq(packagesExclusions, packages, repositories, ivyRepoPath, ivySettingsPath) =
Seq(
"spark.jars.excludes",
"spark.jars.packages",
"spark.jars.repositories",
"spark.jars.ivy",
"spark.jars.ivySettings"
).map(sys.props.get(_).orNull)

val resolvedMavenCoordinates = DependencyUtils.resolveMavenDependencies(packagesExclusions,
packages, repositories, ivyRepoPath)
packages, repositories, ivyRepoPath, Option(ivySettingsPath))
val jars = {
val jarsProp = sys.props.get("spark.jars").orNull
if (!StringUtils.isBlank(resolvedMavenCoordinates)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,8 @@ class BlockManagerMasterEndpoint(
val futures = blockManagerInfo.values.map { bm =>
bm.slaveEndpoint.ask[Int](removeMsg).recover {
case e: IOException =>
logWarning(s"Error trying to remove RDD $rddId", e)
logWarning(s"Error trying to remove RDD $rddId from block manager ${bm.blockManagerId}",
e)
0 // zero blocks were removed
}
}.toSeq
Expand Down Expand Up @@ -195,7 +196,8 @@ class BlockManagerMasterEndpoint(
val futures = requiredBlockManagers.map { bm =>
bm.slaveEndpoint.ask[Int](removeMsg).recover {
case e: IOException =>
logWarning(s"Error trying to remove broadcast $broadcastId", e)
logWarning(s"Error trying to remove broadcast $broadcastId from block manager " +
s"${bm.blockManagerId}", e)
0 // zero blocks were removed
}
}.toSeq
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,9 @@ class SparkSubmitSuite
// Necessary to make ScalaTest 3.x interrupt a thread on the JVM like ScalaTest 2.2.x
implicit val defaultSignaler: Signaler = ThreadSignaler

private val emptyIvySettings = File.createTempFile("ivy", ".xml")
FileUtils.write(emptyIvySettings, "<ivysettings />", StandardCharsets.UTF_8)

override def beforeEach() {
super.beforeEach()
}
Expand Down Expand Up @@ -520,6 +523,7 @@ class SparkSubmitSuite
"--repositories", repo,
"--conf", "spark.ui.enabled=false",
"--conf", "spark.master.rest.enabled=false",
"--conf", s"spark.jars.ivySettings=${emptyIvySettings.getAbsolutePath()}",
unusedJar.toString,
"my.great.lib.MyLib", "my.great.dep.MyLib")
runSparkSubmit(args)
Expand All @@ -530,7 +534,6 @@ class SparkSubmitSuite
val unusedJar = TestUtils.createJarWithClasses(Seq.empty)
val main = MavenCoordinate("my.great.lib", "mylib", "0.1")
val dep = MavenCoordinate("my.great.dep", "mylib", "0.1")
// Test using "spark.jars.packages" and "spark.jars.repositories" configurations.
IvyTestUtils.withRepository(main, Some(dep.toString), None) { repo =>
val args = Seq(
"--class", JarCreationTest.getClass.getName.stripSuffix("$"),
Expand All @@ -540,6 +543,7 @@ class SparkSubmitSuite
"--conf", s"spark.jars.repositories=$repo",
"--conf", "spark.ui.enabled=false",
"--conf", "spark.master.rest.enabled=false",
"--conf", s"spark.jars.ivySettings=${emptyIvySettings.getAbsolutePath()}",
unusedJar.toString,
"my.great.lib.MyLib", "my.great.dep.MyLib")
runSparkSubmit(args)
Expand All @@ -550,7 +554,6 @@ class SparkSubmitSuite
// See https://gist.github.com/shivaram/3a2fecce60768a603dac for a error log
ignore("correctly builds R packages included in a jar with --packages") {
assume(RUtils.isRInstalled, "R isn't installed on this machine.")
// Check if the SparkR package is installed
assume(RUtils.isSparkRInstalled, "SparkR is not installed in this build.")
val main = MavenCoordinate("my.great.lib", "mylib", "0.1")
val sparkHome = sys.props.getOrElse("spark.test.home", fail("spark.test.home is not set!"))
Expand All @@ -563,6 +566,7 @@ class SparkSubmitSuite
"--master", "local-cluster[2,1,1024]",
"--packages", main.toString,
"--repositories", repo,
"--conf", s"spark.jars.ivySettings=${emptyIvySettings.getAbsolutePath()}",
"--verbose",
"--conf", "spark.ui.enabled=false",
rScriptDir)
Expand All @@ -573,7 +577,6 @@ class SparkSubmitSuite
test("include an external JAR in SparkR") {
assume(RUtils.isRInstalled, "R isn't installed on this machine.")
val sparkHome = sys.props.getOrElse("spark.test.home", fail("spark.test.home is not set!"))
// Check if the SparkR package is installed
assume(RUtils.isSparkRInstalled, "SparkR is not installed in this build.")
val rScriptDir =
Seq(sparkHome, "R", "pkg", "tests", "fulltests", "jarTest.R").mkString(File.separator)
Expand Down
2 changes: 1 addition & 1 deletion docs/ml-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ and the migration guide below will explain all changes between releases.
* The class and trait hierarchy for logistic regression model summaries was changed to be cleaner
and better accommodate the addition of the multi-class summary. This is a breaking change for user
code that casts a `LogisticRegressionTrainingSummary` to a
` BinaryLogisticRegressionTrainingSummary`. Users should instead use the `model.binarySummary`
`BinaryLogisticRegressionTrainingSummary`. Users should instead use the `model.binarySummary`
method. See [SPARK-17139](https://issues.apache.org/jira/browse/SPARK-17139) for more detail
(_note_ this is an `Experimental` API). This _does not_ affect the Python `summary` method, which
will still work correctly for both multinomial and binary cases.
Expand Down
4 changes: 2 additions & 2 deletions docs/mllib-feature-extraction.md
Original file line number Diff line number Diff line change
Expand Up @@ -278,8 +278,8 @@ for details on the API.
multiplication. In other words, it scales each column of the dataset by a scalar multiplier. This
represents the [Hadamard product](https://en.wikipedia.org/wiki/Hadamard_product_%28matrices%29)
between the input vector, `v` and transforming vector, `scalingVec`, to yield a result vector.
Qu8T948*1#
Denoting the `scalingVec` as "`w`," this transformation may be written as:

Denoting the `scalingVec` as "`w`", this transformation may be written as:

`\[ \begin{pmatrix}
v_1 \\
Expand Down
4 changes: 2 additions & 2 deletions docs/mllib-pmml-model-export.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,15 @@ displayTitle: PMML model export - RDD-based API
* Table of contents
{:toc}

## `spark.mllib` supported models
## spark.mllib supported models

`spark.mllib` supports model export to Predictive Model Markup Language ([PMML](http://en.wikipedia.org/wiki/Predictive_Model_Markup_Language)).

The table below outlines the `spark.mllib` models that can be exported to PMML and their equivalent PMML model.

<table class="table">
<thead>
<tr><th>`spark.mllib` model</th><th>PMML model</th></tr>
<tr><th>spark.mllib model</th><th>PMML model</th></tr>
</thead>
<tbody>
<tr>
Expand Down
15 changes: 12 additions & 3 deletions docs/running-on-kubernetes.md
Original file line number Diff line number Diff line change
Expand Up @@ -549,14 +549,23 @@ specific to Spark on Kubernetes.
<td><code>spark.kubernetes.driver.limit.cores</code></td>
<td>(none)</td>
<td>
Specify the hard CPU [limit](https://kubernetes.io/docs/concepts/configuration/manage-compute-resources-container/#resource-requests-and-limits-of-pod-and-container) for the driver pod.
Specify a hard cpu [limit](https://kubernetes.io/docs/concepts/configuration/manage-compute-resources-container/#resource-requests-and-limits-of-pod-and-container) for the driver pod.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.executor.request.cores</code></td>
<td>(none)</td>
<td>
Specify the cpu request for each executor pod. Values conform to the Kubernetes [convention](https://kubernetes.io/docs/concepts/configuration/manage-compute-resources-container/#meaning-of-cpu).
Example values include 0.1, 500m, 1.5, 5, etc., with the definition of cpu units documented in [CPU units](https://kubernetes.io/docs/tasks/configure-pod-container/assign-cpu-resource/#cpu-units).
This is distinct from <code>spark.executor.cores</code>: it is only used and takes precedence over <code>spark.executor.cores</code> for specifying the executor pod cpu request if set. Task
parallelism, e.g., number of tasks an executor can run concurrently is not affected by this.
</tr>
<tr>
<td><code>spark.kubernetes.executor.limit.cores</code></td>
<td>(none)</td>
<td>
Specify the hard CPU [limit](https://kubernetes.io/docs/concepts/configuration/manage-compute-resources-container/#resource-requests-and-limits-of-pod-and-container) for each executor pod launched for the Spark Application.
Specify a hard cpu [limit](https://kubernetes.io/docs/concepts/configuration/manage-compute-resources-container/#resource-requests-and-limits-of-pod-and-container) for each executor pod launched for the Spark Application.
</td>
</tr>
<tr>
Expand Down Expand Up @@ -593,4 +602,4 @@ specific to Spark on Kubernetes.
<code>spark.kubernetes.executor.secrets.spark-secret=/etc/secrets</code>.
</td>
</tr>
</table>
</table>
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,10 @@ import org.apache.spark.TaskContext
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, UnsafeRowWriter}
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.kafka010.KafkaSourceProvider.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE, INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE}
import org.apache.spark.sql.sources.v2.reader._
import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousDataReader, ContinuousReader, Offset, PartitionOffset}
import org.apache.spark.sql.types.StructType
import org.apache.spark.unsafe.types.UTF8String

/**
* A [[ContinuousReader]] for data from kafka.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,16 @@ package org.apache.spark.sql.kafka010
import org.apache.kafka.clients.consumer.ConsumerRecord

import org.apache.spark.sql.catalyst.expressions.UnsafeRow
import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, UnsafeRowWriter}
import org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.unsafe.types.UTF8String

/** A simple class for converting Kafka ConsumerRecord to UnsafeRow */
private[kafka010] class KafkaRecordToUnsafeRowConverter {
private val sharedRow = new UnsafeRow(7)
private val bufferHolder = new BufferHolder(sharedRow)
private val rowWriter = new UnsafeRowWriter(bufferHolder, 7)
private val rowWriter = new UnsafeRowWriter(7)

def toUnsafeRow(record: ConsumerRecord[Array[Byte], Array[Byte]]): UnsafeRow = {
bufferHolder.reset()
rowWriter.reset()

if (record.key == null) {
rowWriter.setNullAt(0)
Expand All @@ -46,7 +44,6 @@ private[kafka010] class KafkaRecordToUnsafeRowConverter {
5,
DateTimeUtils.fromJavaTimestamp(new java.sql.Timestamp(record.timestamp)))
rowWriter.write(6, record.timestampType.id)
sharedRow.setTotalSize(bufferHolder.totalSize)
sharedRow
rowWriter.getRow()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,14 @@ public void testAppHandleDisconnect() throws Exception {
Socket s = new Socket(InetAddress.getLoopbackAddress(), server.getPort());
client = new TestClient(s);
client.send(new Hello(secret, "1.4.0"));
client.send(new SetAppId("someId"));

// Wait until we know the server has received the messages and matched the handle to the
// connection before disconnecting.
eventually(Duration.ofSeconds(1), Duration.ofMillis(10), () -> {
assertEquals("someId", handle.getAppId());
});

handle.disconnect();
waitForError(client, secret);
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ class StringIndexerModel (
val metadata = NominalAttribute.defaultAttr
.withName($(outputCol)).withValues(filteredLabels).toMetadata()
// If we are skipping invalid records, filter them out.
val (filteredDataset, keepInvalid) = getHandleInvalid match {
val (filteredDataset, keepInvalid) = $(handleInvalid) match {
case StringIndexer.SKIP_INVALID =>
val filterer = udf { label: String =>
labelToIndex.contains(label)
Expand Down
Loading

0 comments on commit 3bf8055

Please sign in to comment.