Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/branch-2.3' into branch-2.3
Browse files Browse the repository at this point in the history
  • Loading branch information
e-dorigatti committed Jun 12, 2018
2 parents 7628936 + bf58687 commit e7db468
Show file tree
Hide file tree
Showing 19 changed files with 260 additions and 144 deletions.
29 changes: 28 additions & 1 deletion core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.deploy

import java.io.File
import java.net.{InetAddress, URI}
import java.nio.file.Files

import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
Expand Down Expand Up @@ -48,7 +49,7 @@ object PythonRunner {

// Format python file paths before adding them to the PYTHONPATH
val formattedPythonFile = formatPath(pythonFile)
val formattedPyFiles = formatPaths(pyFiles)
val formattedPyFiles = resolvePyFiles(formatPaths(pyFiles))

// Launch a Py4J gateway server for the process to connect to; this will let it see our
// Java system properties and such
Expand Down Expand Up @@ -153,4 +154,30 @@ object PythonRunner {
.map { p => formatPath(p, testWindows) }
}

/**
* Resolves the ".py" files. ".py" file should not be added as is because PYTHONPATH does
* not expect a file. This method creates a temporary directory and puts the ".py" files
* if exist in the given paths.
*/
private def resolvePyFiles(pyFiles: Array[String]): Array[String] = {
lazy val dest = Utils.createTempDir(namePrefix = "localPyFiles")
pyFiles.flatMap { pyFile =>
// In case of client with submit, the python paths should be set before context
// initialization because the context initialization can be done later.
// We will copy the local ".py" files because ".py" file shouldn't be added
// alone but its parent directory in PYTHONPATH. See SPARK-24384.
if (pyFile.endsWith(".py")) {
val source = new File(pyFile)
if (source.exists() && source.isFile && source.canRead) {
Files.copy(source.toPath, new File(dest, source.getName).toPath)
Some(dest.getAbsolutePath)
} else {
// Don't have to add it if it doesn't exist or isn't readable.
None
}
} else {
Some(pyFile)
}
}.distinct
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ import org.apache.spark.util.{AccumulatorV2, ThreadUtils, Utils}
* up to launch speculative tasks, etc.
*
* Clients should first call initialize() and start(), then submit task sets through the
* runTasks method.
* submitTasks method.
*
* THREADING: [[SchedulerBackend]]s and task-submitting clients can call this class from multiple
* threads, so it needs locks in public API methods to maintain its state. In addition, some
Expand All @@ -62,7 +62,7 @@ private[spark] class TaskSchedulerImpl(
this(sc, sc.conf.get(config.MAX_TASK_FAILURES))
}

// Lazily initializing blackListTrackOpt to avoid getting empty ExecutorAllocationClient,
// Lazily initializing blacklistTrackerOpt to avoid getting empty ExecutorAllocationClient,
// because ExecutorAllocationClient is created after this TaskSchedulerImpl.
private[scheduler] lazy val blacklistTrackerOpt = maybeCreateBlacklistTracker(sc)

Expand Down Expand Up @@ -228,7 +228,7 @@ private[spark] class TaskSchedulerImpl(
// 1. The task set manager has been created and some tasks have been scheduled.
// In this case, send a kill signal to the executors to kill the task and then abort
// the stage.
// 2. The task set manager has been created but no tasks has been scheduled. In this case,
// 2. The task set manager has been created but no tasks have been scheduled. In this case,
// simply abort the stage.
tsm.runningTasksSet.foreach { tid =>
taskIdToExecutorId.get(tid).foreach(execId =>
Expand Down Expand Up @@ -694,7 +694,7 @@ private[spark] class TaskSchedulerImpl(
*
* After stage failure and retry, there may be multiple TaskSetManagers for the stage.
* If an earlier attempt of a stage completes a task, we should ensure that the later attempts
* do not also submit those same tasks. That also means that a task completion from an earlier
* do not also submit those same tasks. That also means that a task completion from an earlier
* attempt can lead to the entire stage getting marked as successful.
*/
private[scheduler] def markPartitionCompletedInAllTaskSets(stageId: Int, partitionId: Int) = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,9 @@ private[ui] class AllJobsPage(parent: JobsTab, store: AppStatusStore) extends We
jobs: Seq[v1.JobData],
killEnabled: Boolean): Seq[Node] = {
// stripXSS is called to remove suspicious characters used in XSS attacks
val allParameters = request.getParameterMap.asScala.toMap.mapValues(_.map(UIUtils.stripXSS))
val allParameters = request.getParameterMap.asScala.toMap.map { case (k, v) =>
UIUtils.stripXSS(k) -> v.map(UIUtils.stripXSS).toSeq
}
val parameterOtherTable = allParameters.filterNot(_._1.startsWith(jobTag))
.map(para => para._1 + "=" + para._2(0))

Expand Down
12 changes: 8 additions & 4 deletions core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,7 @@ private[ui] class StagePage(parent: StagesTab, store: AppStatusStore) extends We

val localitySummary = store.localitySummary(stageData.stageId, stageData.attemptId)

val totalTasks = stageData.numActiveTasks + stageData.numCompleteTasks +
stageData.numFailedTasks + stageData.numKilledTasks
val totalTasks = taskCount(stageData)
if (totalTasks == 0) {
val content =
<div>
Expand All @@ -133,7 +132,7 @@ private[ui] class StagePage(parent: StagesTab, store: AppStatusStore) extends We
val totalTasksNumStr = if (totalTasks == storedTasks) {
s"$totalTasks"
} else {
s"$totalTasks, showing ${storedTasks}"
s"$storedTasks, showing ${totalTasks}"
}

val summary =
Expand Down Expand Up @@ -678,7 +677,7 @@ private[ui] class TaskDataSource(

private var _tasksToShow: Seq[TaskData] = null

override def dataSize: Int = stage.numTasks
override def dataSize: Int = taskCount(stage)

override def sliceData(from: Int, to: Int): Seq[TaskData] = {
if (_tasksToShow == null) {
Expand Down Expand Up @@ -1044,4 +1043,9 @@ private[ui] object ApiHelper {
(stage.map(_.name).getOrElse(""), stage.flatMap(_.description).getOrElse(job.name))
}

def taskCount(stageData: StageData): Int = {
stageData.numActiveTasks + stageData.numCompleteTasks + stageData.numFailedTasks +
stageData.numKilledTasks
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ private[ui] class StageTableBase(
killEnabled: Boolean,
isFailedStage: Boolean) {
// stripXSS is called to remove suspicious characters used in XSS attacks
val allParameters = request.getParameterMap.asScala.toMap.mapValues(_.map(UIUtils.stripXSS))
val allParameters = request.getParameterMap.asScala.toMap.map { case (k, v) =>
UIUtils.stripXSS(k) -> v.map(UIUtils.stripXSS).toSeq
}
val parameterOtherTable = allParameters.filterNot(_._1.startsWith(stageTag))
.map(para => para._1 + "=" + para._2(0))

Expand Down
9 changes: 9 additions & 0 deletions docs/sql-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -1737,6 +1737,15 @@ To use `groupBy().apply()`, the user needs to define the following:
* A Python function that defines the computation for each group.
* A `StructType` object or a string that defines the schema of the output `DataFrame`.

The output schema will be applied to the columns of the returned `pandas.DataFrame` in order by position,
not by name. This means that the columns in the `pandas.DataFrame` must be indexed so that their
position matches the corresponding field in the schema.

Note that when creating a new `pandas.DataFrame` using a dictionary, the actual position of the column
can differ from the order that it was placed in the dictionary. It is recommended in this case to
explicitly define the column order using the `columns` keyword, e.g.
`pandas.DataFrame({'id': ids, 'a': data}, columns=['id', 'a'])`, or alternatively use an `OrderedDict`.

Note that all data for a group will be loaded into memory before the function is applied. This can
lead to out of memory exceptons, especially if the group sizes are skewed. The configuration for
[maxRecordsPerBatch](#setting-arrow-batch-size) is not applied on groups and it is up to the user
Expand Down
3 changes: 2 additions & 1 deletion project/SparkBuild.scala
Original file line number Diff line number Diff line change
Expand Up @@ -728,7 +728,8 @@ object Unidoc {

scalacOptions in (ScalaUnidoc, unidoc) ++= Seq(
"-groups", // Group similar methods together based on the @group annotation.
"-skip-packages", "org.apache.hadoop"
"-skip-packages", "org.apache.hadoop",
"-sourcepath", (baseDirectory in ThisBuild).value.getAbsolutePath
) ++ (
// Add links to sources when generating Scaladoc for a non-snapshot release
if (!isSnapshot.value) {
Expand Down
9 changes: 8 additions & 1 deletion python/pyspark/sql/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -2216,7 +2216,8 @@ def pandas_udf(f=None, returnType=None, functionType=None):
A grouped map UDF defines transformation: A `pandas.DataFrame` -> A `pandas.DataFrame`
The returnType should be a :class:`StructType` describing the schema of the returned
`pandas.DataFrame`.
The length of the returned `pandas.DataFrame` can be arbitrary.
The length of the returned `pandas.DataFrame` can be arbitrary and the columns must be
indexed so that their position matches the corresponding field in the schema.
Grouped map UDFs are used with :meth:`pyspark.sql.GroupedData.apply`.
Expand All @@ -2239,6 +2240,12 @@ def pandas_udf(f=None, returnType=None, functionType=None):
| 2| 1.1094003924504583|
+---+-------------------+
.. note:: If returning a new `pandas.DataFrame` constructed with a dictionary, it is
recommended to explicitly index the columns by name to ensure the positions are correct,
or alternatively use an `OrderedDict`.
For example, `pd.DataFrame({'id': ids, 'a': data}, columns=['id', 'a'])` or
`pd.DataFrame(OrderedDict([('id', ids), ('a', data)]))`.
.. seealso:: :meth:`pyspark.sql.GroupedData.apply`
.. note:: The user-defined functions are considered deterministic by default. Due to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,16 +263,11 @@ class YarnClusterSuite extends BaseYarnClusterSuite {
"PYSPARK_ARCHIVES_PATH" -> pythonPath.map("local:" + _).mkString(File.pathSeparator),
"PYTHONPATH" -> pythonPath.mkString(File.pathSeparator)) ++ extraEnv

val moduleDir =
if (clientMode) {
// In client-mode, .py files added with --py-files are not visible in the driver.
// This is something that the launcher library would have to handle.
tempDir
} else {
val subdir = new File(tempDir, "pyModules")
subdir.mkdir()
subdir
}
val moduleDir = {
val subdir = new File(tempDir, "pyModules")
subdir.mkdir()
subdir
}
val pyModule = new File(moduleDir, "mod1.py")
Files.write(TEST_PYMODULE, pyModule, StandardCharsets.UTF_8)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,13 +161,17 @@ object DecimalType extends AbstractDataType {
* This method is used only when `spark.sql.decimalOperations.allowPrecisionLoss` is set to true.
*/
private[sql] def adjustPrecisionScale(precision: Int, scale: Int): DecimalType = {
// Assumptions:
// Assumption:
assert(precision >= scale)
assert(scale >= 0)

if (precision <= MAX_PRECISION) {
// Adjustment only needed when we exceed max precision
DecimalType(precision, scale)
} else if (scale < 0) {
// Decimal can have negative scale (SPARK-24468). In this case, we cannot allow a precision
// loss since we would cause a loss of digits in the integer part.
// In this case, we are likely to meet an overflow.
DecimalType(MAX_PRECISION, scale)
} else {
// Precision/scale exceed maximum precision. Result must be adjusted to MAX_PRECISION.
val intDigits = precision - scale
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,15 @@ class DecimalPrecisionSuite extends AnalysisTest with BeforeAndAfter {
}
}

test("SPARK-24468: operations on decimals with negative scale") {
val a = AttributeReference("a", DecimalType(3, -10))()
val b = AttributeReference("b", DecimalType(1, -1))()
val c = AttributeReference("c", DecimalType(35, 1))()
checkType(Multiply(a, b), DecimalType(5, -11))
checkType(Multiply(a, c), DecimalType(38, -9))
checkType(Multiply(b, c), DecimalType(37, 0))
}

/** strength reduction for integer/decimal comparisons */
def ruleTest(initial: Expression, transformed: Expression): Unit = {
val testRelation = LocalRelation(AttributeReference("a", IntegerType)())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -372,9 +372,9 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {

val (functionsWithDistinct, functionsWithoutDistinct) =
aggregateExpressions.partition(_.isDistinct)
if (functionsWithDistinct.map(_.aggregateFunction.children).distinct.length > 1) {
if (functionsWithDistinct.map(_.aggregateFunction.children.toSet).distinct.length > 1) {
// This is a sanity check. We should not reach here when we have multiple distinct
// column sets. Our MultipleDistinctRewriter should take care this case.
// column sets. Our `RewriteDistinctAggregates` should take care this case.
sys.error("You hit a query analyzer bug. Please report your query to " +
"Spark user mailing list.")
}
Expand Down
6 changes: 5 additions & 1 deletion sql/core/src/test/resources/sql-tests/inputs/group-by.sql
Original file line number Diff line number Diff line change
Expand Up @@ -68,4 +68,8 @@ SELECT 1 from (
FROM (select 1 as x) a
WHERE false
) b
where b.z != b.z
where b.z != b.z;

-- SPARK-24369 multiple distinct aggregations having the same argument set
SELECT corr(DISTINCT x, y), corr(DISTINCT y, x), count(*)
FROM (VALUES (1, 1), (2, 2), (2, 2)) t(x, y);
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,14 @@ select 10.3000 * 3.0;
select 10.30000 * 30.0;
select 10.300000000000000000 * 3.000000000000000000;
select 10.300000000000000000 * 3.0000000000000000000;
select 2.35E10 * 1.0;

-- arithmetic operations causing an overflow return NULL
select (5e36 + 0.1) + 5e36;
select (-4e36 - 0.1) - 7e36;
select 12345678901234567890.0 * 12345678901234567890.0;
select 1e35 / 0.1;
select 1.2345678901234567890E30 * 1.2345678901234567890E25;

-- arithmetic operations causing a precision loss are truncated
select 12345678912345678912345678912.1234567 + 9999999999999999999999999999999.12345;
Expand All @@ -67,12 +69,14 @@ select 10.3000 * 3.0;
select 10.30000 * 30.0;
select 10.300000000000000000 * 3.000000000000000000;
select 10.300000000000000000 * 3.0000000000000000000;
select 2.35E10 * 1.0;

-- arithmetic operations causing an overflow return NULL
select (5e36 + 0.1) + 5e36;
select (-4e36 - 0.1) - 7e36;
select 12345678901234567890.0 * 12345678901234567890.0;
select 1e35 / 0.1;
select 1.2345678901234567890E30 * 1.2345678901234567890E25;

-- arithmetic operations causing a precision loss return NULL
select 12345678912345678912345678912.1234567 + 9999999999999999999999999999999.12345;
Expand Down
11 changes: 10 additions & 1 deletion sql/core/src/test/resources/sql-tests/results/group-by.sql.out
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
-- Automatically generated by SQLQueryTestSuite
-- Number of queries: 26
-- Number of queries: 27


-- !query 0
Expand Down Expand Up @@ -241,3 +241,12 @@ where b.z != b.z
struct<1:int>
-- !query 25 output



-- !query 26
SELECT corr(DISTINCT x, y), corr(DISTINCT y, x), count(*)
FROM (VALUES (1, 1), (2, 2), (2, 2)) t(x, y)
-- !query 26 schema
struct<corr(DISTINCT CAST(x AS DOUBLE), CAST(y AS DOUBLE)):double,corr(DISTINCT CAST(y AS DOUBLE), CAST(x AS DOUBLE)):double,count(1):bigint>
-- !query 26 output
1.0 1.0 3
Loading

0 comments on commit e7db468

Please sign in to comment.