Skip to content

Commit

Permalink
Merge pull request apache#307 from palantir/ds/sync-upstream
Browse files Browse the repository at this point in the history
  • Loading branch information
dansanduleac authored Feb 6, 2018
2 parents e017c20 + c05f7c4 commit 60c250f
Show file tree
Hide file tree
Showing 190 changed files with 2,272 additions and 936 deletions.
4 changes: 2 additions & 2 deletions R/pkg/R/DataFrame.R
Original file line number Diff line number Diff line change
Expand Up @@ -2090,8 +2090,8 @@ setMethod("selectExpr",
#'
#' @param x a SparkDataFrame.
#' @param colName a column name.
#' @param col a Column expression (which must refer only to this DataFrame), or an atomic vector in
#' the length of 1 as literal value.
#' @param col a Column expression (which must refer only to this SparkDataFrame), or an atomic
#' vector in the length of 1 as literal value.
#' @return A SparkDataFrame with the new column added or the existing column replaced.
#' @family SparkDataFrame functions
#' @aliases withColumn,SparkDataFrame,character-method
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,9 @@ private class DownloadCallback implements StreamCallback {

@Override
public void onData(String streamId, ByteBuffer buf) throws IOException {
channel.write(buf);
while (buf.hasRemaining()) {
channel.write(buf);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,10 +172,7 @@ public long acquireExecutionMemory(long required, MemoryConsumer consumer) {
currentEntry = sortedConsumers.lastEntry();
}
List<MemoryConsumer> cList = currentEntry.getValue();
MemoryConsumer c = cList.remove(cList.size() - 1);
if (cList.isEmpty()) {
sortedConsumers.remove(currentEntry.getKey());
}
MemoryConsumer c = cList.get(cList.size() - 1);
try {
long released = c.spill(required - got, consumer);
if (released > 0) {
Expand All @@ -185,6 +182,11 @@ public long acquireExecutionMemory(long required, MemoryConsumer consumer) {
if (got >= required) {
break;
}
} else {
cList.remove(cList.size() - 1);
if (cList.isEmpty()) {
sortedConsumers.remove(currentEntry.getKey());
}
}
} catch (ClosedByInterruptException e) {
// This called by user to kill a task (e.g: speculative task).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,10 @@ public UnsafeSorterSpillReader(
SparkEnv.get() == null ? 0.5 :
SparkEnv.get().conf().getDouble("spark.unsafe.sorter.spill.read.ahead.fraction", 0.5);

// SPARK-23310: Disable read-ahead input stream, because it is causing lock contention
// and perf regression for TPC-DS queries.
final boolean readAheadEnabled = SparkEnv.get() != null &&
SparkEnv.get().conf().getBoolean("spark.unsafe.sorter.spill.read.ahead.enabled", true);
SparkEnv.get().conf().getBoolean("spark.unsafe.sorter.spill.read.ahead.enabled", false);

final InputStream bs =
new NioBufferedFileInputStream(file, (int) bufferSizeBytes);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,16 @@ private[spark] object PythonEvalType {

val SQL_BATCHED_UDF = 100

val SQL_PANDAS_SCALAR_UDF = 200
val SQL_PANDAS_GROUP_MAP_UDF = 201
val SQL_PANDAS_GROUP_AGG_UDF = 202
val SQL_SCALAR_PANDAS_UDF = 200
val SQL_GROUPED_MAP_PANDAS_UDF = 201
val SQL_GROUPED_AGG_PANDAS_UDF = 202

def toString(pythonEvalType: Int): String = pythonEvalType match {
case NON_UDF => "NON_UDF"
case SQL_BATCHED_UDF => "SQL_BATCHED_UDF"
case SQL_PANDAS_SCALAR_UDF => "SQL_PANDAS_SCALAR_UDF"
case SQL_PANDAS_GROUP_MAP_UDF => "SQL_PANDAS_GROUP_MAP_UDF"
case SQL_PANDAS_GROUP_AGG_UDF => "SQL_PANDAS_GROUP_AGG_UDF"
case SQL_SCALAR_PANDAS_UDF => "SQL_SCALAR_PANDAS_UDF"
case SQL_GROUPED_MAP_PANDAS_UDF => "SQL_GROUPED_MAP_PANDAS_UDF"
case SQL_GROUPED_AGG_PANDAS_UDF => "SQL_GROUPED_AGG_PANDAS_UDF"
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,19 +141,6 @@ private[spark] class IndexShuffleBlockResolver(
val indexFile = getIndexFile(shuffleId, mapId)
val indexTmp = Utils.tempFileWith(indexFile)
try {
val out = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(indexTmp)))
Utils.tryWithSafeFinally {
// We take in lengths of each block, need to convert it to offsets.
var offset = 0L
out.writeLong(offset)
for (length <- lengths) {
offset += length
out.writeLong(offset)
}
} {
out.close()
}

val dataFile = getDataFile(shuffleId, mapId)
// There is only one IndexShuffleBlockResolver per executor, this synchronization make sure
// the following check and rename are atomic.
Expand All @@ -166,10 +153,22 @@ private[spark] class IndexShuffleBlockResolver(
if (dataTmp != null && dataTmp.exists()) {
dataTmp.delete()
}
indexTmp.delete()
} else {
// This is the first successful attempt in writing the map outputs for this task,
// so override any existing index and data files with the ones we wrote.
val out = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(indexTmp)))
Utils.tryWithSafeFinally {
// We take in lengths of each block, need to convert it to offsets.
var offset = 0L
out.writeLong(offset)
for (length <- lengths) {
offset += length
out.writeLong(offset)
}
} {
out.close()
}

if (indexFile.exists()) {
indexFile.delete()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -875,8 +875,8 @@ private[spark] class AppStatusListener(
return
}

val toDelete = KVUtils.viewToSeq(kvstore.view(classOf[JobDataWrapper]),
countToDelete.toInt) { j =>
val view = kvstore.view(classOf[JobDataWrapper]).index("completionTime").first(0L)
val toDelete = KVUtils.viewToSeq(view, countToDelete.toInt) { j =>
j.info.status != JobExecutionStatus.RUNNING && j.info.status != JobExecutionStatus.UNKNOWN
}
toDelete.foreach { j => kvstore.delete(j.getClass(), j.info.jobId) }
Expand All @@ -888,8 +888,8 @@ private[spark] class AppStatusListener(
return
}

val stages = KVUtils.viewToSeq(kvstore.view(classOf[StageDataWrapper]),
countToDelete.toInt) { s =>
val view = kvstore.view(classOf[StageDataWrapper]).index("completionTime").first(0L)
val stages = KVUtils.viewToSeq(view, countToDelete.toInt) { s =>
s.info.status != v1.StageStatus.ACTIVE && s.info.status != v1.StageStatus.PENDING
}

Expand Down Expand Up @@ -945,8 +945,9 @@ private[spark] class AppStatusListener(
val countToDelete = calculateNumberToRemove(stage.savedTasks.get(), maxTasksPerStage).toInt
if (countToDelete > 0) {
val stageKey = Array(stage.info.stageId, stage.info.attemptNumber)
val view = kvstore.view(classOf[TaskDataWrapper]).index("stage").first(stageKey)
.last(stageKey)
val view = kvstore.view(classOf[TaskDataWrapper])
.index(TaskIndexNames.COMPLETION_TIME)
.parent(stageKey)

// Try to delete finished tasks only.
val toDelete = KVUtils.viewToSeq(view, countToDelete) { t =>
Expand Down
11 changes: 9 additions & 2 deletions core/src/main/scala/org/apache/spark/status/AppStatusUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,23 @@

package org.apache.spark.status

import org.apache.spark.status.api.v1.{TaskData, TaskMetrics}
import org.apache.spark.status.api.v1.TaskData

private[spark] object AppStatusUtils {

private val TASK_FINISHED_STATES = Set("FAILED", "KILLED", "SUCCESS")

private def isTaskFinished(task: TaskData): Boolean = {
TASK_FINISHED_STATES.contains(task.status)
}

def schedulerDelay(task: TaskData): Long = {
if (task.taskMetrics.isDefined && task.duration.isDefined) {
if (isTaskFinished(task) && task.taskMetrics.isDefined && task.duration.isDefined) {
val m = task.taskMetrics.get
schedulerDelay(task.launchTime.getTime(), fetchStart(task), task.duration.get,
m.executorDeserializeTime, m.resultSerializationTime, m.executorRunTime)
} else {
// The task is still running and the metrics like executorRunTime are not available.
0L
}
}
Expand Down
7 changes: 7 additions & 0 deletions core/src/main/scala/org/apache/spark/status/storeTypes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ private[spark] class JobDataWrapper(
@JsonIgnore @KVIndex
private def id: Int = info.jobId

@JsonIgnore @KVIndex("completionTime")
private def completionTime: Long = info.completionTime.map(_.getTime).getOrElse(-1L)
}

private[spark] class StageDataWrapper(
Expand All @@ -90,6 +92,8 @@ private[spark] class StageDataWrapper(
@JsonIgnore @KVIndex("active")
private def active: Boolean = info.status == StageStatus.ACTIVE

@JsonIgnore @KVIndex("completionTime")
private def completionTime: Long = info.completionTime.map(_.getTime).getOrElse(-1L)
}

/**
Expand Down Expand Up @@ -134,6 +138,7 @@ private[spark] object TaskIndexNames {
final val STAGE = "stage"
final val STATUS = "sta"
final val TASK_INDEX = "idx"
final val COMPLETION_TIME = "ct"
}

/**
Expand Down Expand Up @@ -337,6 +342,8 @@ private[spark] class TaskDataWrapper(
@JsonIgnore @KVIndex(value = TaskIndexNames.ERROR, parent = TaskIndexNames.STAGE)
private def error: String = if (errorMessage.isDefined) errorMessage.get else ""

@JsonIgnore @KVIndex(value = TaskIndexNames.COMPLETION_TIME, parent = TaskIndexNames.STAGE)
private def completionTime: Long = launchTime + duration
}

private[spark] class RDDStorageInfoWrapper(val info: RDDStorageInfo) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,12 +157,24 @@ private void inProcessLauncherTestImpl() throws Exception {

SparkAppHandle handle = null;
try {
handle = new InProcessLauncher()
.setMaster("local")
.setAppResource(SparkLauncher.NO_RESOURCE)
.setMainClass(InProcessTestApp.class.getName())
.addAppArgs("hello")
.startApplication(listener);
synchronized (InProcessTestApp.LOCK) {
handle = new InProcessLauncher()
.setMaster("local")
.setAppResource(SparkLauncher.NO_RESOURCE)
.setMainClass(InProcessTestApp.class.getName())
.addAppArgs("hello")
.startApplication(listener);

// SPARK-23020: see doc for InProcessTestApp.LOCK for a description of the race. Here
// we wait until we know that the connection between the app and the launcher has been
// established before allowing the app to finish.
final SparkAppHandle _handle = handle;
eventually(Duration.ofSeconds(5), Duration.ofMillis(10), () -> {
assertNotEquals(SparkAppHandle.State.UNKNOWN, _handle.getState());
});

InProcessTestApp.LOCK.wait(5000);
}

waitFor(handle);
assertEquals(SparkAppHandle.State.FINISHED, handle.getState());
Expand Down Expand Up @@ -193,10 +205,26 @@ public static void main(String[] args) throws Exception {

public static class InProcessTestApp {

/**
* SPARK-23020: there's a race caused by a child app finishing too quickly. This would cause
* the InProcessAppHandle to dispose of itself even before the child connection was properly
* established, so no state changes would be detected for the application and its final
* state would be LOST.
*
* It's not really possible to fix that race safely in the handle code itself without changing
* the way in-process apps talk to the launcher library, so we work around that in the test by
* synchronizing on this object.
*/
public static final Object LOCK = new Object();

public static void main(String[] args) throws Exception {
assertNotEquals(0, args.length);
assertEquals(args[0], "hello");
new SparkContext().stop();

synchronized (LOCK) {
LOCK.notifyAll();
}
}

}
Expand Down
5 changes: 4 additions & 1 deletion core/src/test/scala/org/apache/spark/FileSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,10 @@ class FileSuite extends SparkFunSuite with LocalSparkContext {
for (i <- 0 until testOutputCopies) {
// Shift values by i so that they're different in the output
val alteredOutput = testOutput.map(b => (b + i).toByte)
channel.write(ByteBuffer.wrap(alteredOutput))
val buffer = ByteBuffer.wrap(alteredOutput)
while (buffer.hasRemaining) {
channel.write(buffer)
}
}
channel.close()
file.close()
Expand Down
Loading

0 comments on commit 60c250f

Please sign in to comment.