Skip to content

Commit

Permalink
Merge remote-tracking branch 'apache-github/master' into SPARK-19459
Browse files Browse the repository at this point in the history
# Conflicts:
#	sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
  • Loading branch information
hvanhovell committed Feb 8, 2017
2 parents f42348a + 0077bfc commit e378f62
Show file tree
Hide file tree
Showing 91 changed files with 3,362 additions and 1,910 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ notifications:
# 5. Run maven install before running lint-java.
install:
- export MAVEN_SKIP_RC=1
- build/mvn -T 4 -q -DskipTests -Pmesos -Pyarn -Phadoop-2.3 -Pkinesis-asl -Phive -Phive-thriftserver install
- build/mvn -T 4 -q -DskipTests -Pmesos -Pyarn -Pkinesis-asl -Phive -Phive-thriftserver install

# 6. Run lint-java.
script:
Expand Down
2 changes: 1 addition & 1 deletion appveyor.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ install:
- cmd: R -e "packageVersion('survival')"

build_script:
- cmd: mvn -DskipTests -Phadoop-2.6 -Psparkr -Phive -Phive-thriftserver package
- cmd: mvn -DskipTests -Psparkr -Phive -Phive-thriftserver package

test_script:
- cmd: .\bin\spark-submit2.cmd --conf spark.hadoop.fs.default.name="file:///" R\pkg\tests\run-all.R
Expand Down
2 changes: 1 addition & 1 deletion assembly/README
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,4 @@ This module is off by default. To activate it specify the profile in the command

If you need to build an assembly for a different version of Hadoop the
hadoop-version system property needs to be set as in this example:
-Dhadoop.version=2.0.6-alpha
-Dhadoop.version=2.7.3
59 changes: 16 additions & 43 deletions core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.spark.deploy

import java.io.IOException
import java.lang.reflect.Method
import java.security.PrivilegedExceptionAction
import java.text.DateFormat
import java.util.{Arrays, Comparator, Date, Locale}
Expand All @@ -29,7 +28,6 @@ import scala.util.control.NonFatal
import com.google.common.primitives.Longs
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, PathFilter}
import org.apache.hadoop.fs.FileSystem.Statistics
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.security.{Credentials, UserGroupInformation}
import org.apache.hadoop.security.token.{Token, TokenIdentifier}
Expand Down Expand Up @@ -140,54 +138,29 @@ class SparkHadoopUtil extends Logging {
/**
* Returns a function that can be called to find Hadoop FileSystem bytes read. If
* getFSBytesReadOnThreadCallback is called from thread r at time t, the returned callback will
* return the bytes read on r since t. Reflection is required because thread-level FileSystem
* statistics are only available as of Hadoop 2.5 (see HADOOP-10688).
* Returns None if the required method can't be found.
* return the bytes read on r since t.
*
* @return None if the required method can't be found.
*/
private[spark] def getFSBytesReadOnThreadCallback(): Option[() => Long] = {
try {
val threadStats = getFileSystemThreadStatistics()
val getBytesReadMethod = getFileSystemThreadStatisticsMethod("getBytesRead")
val f = () => threadStats.map(getBytesReadMethod.invoke(_).asInstanceOf[Long]).sum
val baselineBytesRead = f()
Some(() => f() - baselineBytesRead)
} catch {
case e @ (_: NoSuchMethodException | _: ClassNotFoundException) =>
logDebug("Couldn't find method for retrieving thread-level FileSystem input data", e)
None
}
private[spark] def getFSBytesReadOnThreadCallback(): () => Long = {
val threadStats = FileSystem.getAllStatistics.asScala.map(_.getThreadStatistics)
val f = () => threadStats.map(_.getBytesRead).sum
val baselineBytesRead = f()
() => f() - baselineBytesRead
}

/**
* Returns a function that can be called to find Hadoop FileSystem bytes written. If
* getFSBytesWrittenOnThreadCallback is called from thread r at time t, the returned callback will
* return the bytes written on r since t. Reflection is required because thread-level FileSystem
* statistics are only available as of Hadoop 2.5 (see HADOOP-10688).
* Returns None if the required method can't be found.
* return the bytes written on r since t.
*
* @return None if the required method can't be found.
*/
private[spark] def getFSBytesWrittenOnThreadCallback(): Option[() => Long] = {
try {
val threadStats = getFileSystemThreadStatistics()
val getBytesWrittenMethod = getFileSystemThreadStatisticsMethod("getBytesWritten")
val f = () => threadStats.map(getBytesWrittenMethod.invoke(_).asInstanceOf[Long]).sum
val baselineBytesWritten = f()
Some(() => f() - baselineBytesWritten)
} catch {
case e @ (_: NoSuchMethodException | _: ClassNotFoundException) =>
logDebug("Couldn't find method for retrieving thread-level FileSystem output data", e)
None
}
}

private def getFileSystemThreadStatistics(): Seq[AnyRef] = {
FileSystem.getAllStatistics.asScala.map(
Utils.invoke(classOf[Statistics], _, "getThreadStatistics"))
}

private def getFileSystemThreadStatisticsMethod(methodName: String): Method = {
val statisticsDataClass =
Utils.classForName("org.apache.hadoop.fs.FileSystem$Statistics$StatisticsData")
statisticsDataClass.getDeclaredMethod(methodName)
private[spark] def getFSBytesWrittenOnThreadCallback(): () => Long = {
val threadStats = FileSystem.getAllStatistics.asScala.map(_.getThreadStatistics)
val f = () => threadStats.map(_.getBytesWritten).sum
val baselineBytesWritten = f()
() => f() - baselineBytesWritten
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,7 @@ object SparkHadoopMapReduceWriter extends Logging {
val taskContext = new TaskAttemptContextImpl(hadoopConf, attemptId)
committer.setupTask(taskContext)

val outputMetricsAndBytesWrittenCallback: Option[(OutputMetrics, () => Long)] =
SparkHadoopWriterUtils.initHadoopOutputMetrics(context)
val (outputMetrics, callback) = SparkHadoopWriterUtils.initHadoopOutputMetrics(context)

// Initiate the writer.
val taskFormat = outputFormat.newInstance()
Expand All @@ -149,8 +148,7 @@ object SparkHadoopMapReduceWriter extends Logging {
writer.write(pair._1, pair._2)

// Update bytes written metric every few records
SparkHadoopWriterUtils.maybeUpdateOutputMetrics(
outputMetricsAndBytesWrittenCallback, recordsWritten)
SparkHadoopWriterUtils.maybeUpdateOutputMetrics(outputMetrics, callback, recordsWritten)
recordsWritten += 1
}
if (writer != null) {
Expand All @@ -171,11 +169,8 @@ object SparkHadoopMapReduceWriter extends Logging {
}
})

outputMetricsAndBytesWrittenCallback.foreach {
case (om, callback) =>
om.setBytesWritten(callback())
om.setRecordsWritten(recordsWritten)
}
outputMetrics.setBytesWritten(callback())
outputMetrics.setRecordsWritten(recordsWritten)

ret
} catch {
Expand Down Expand Up @@ -222,24 +217,18 @@ object SparkHadoopWriterUtils {
// TODO: these don't seem like the right abstractions.
// We should abstract the duplicate code in a less awkward way.

// return type: (output metrics, bytes written callback), defined only if the latter is defined
def initHadoopOutputMetrics(
context: TaskContext): Option[(OutputMetrics, () => Long)] = {
def initHadoopOutputMetrics(context: TaskContext): (OutputMetrics, () => Long) = {
val bytesWrittenCallback = SparkHadoopUtil.get.getFSBytesWrittenOnThreadCallback()
bytesWrittenCallback.map { b =>
(context.taskMetrics().outputMetrics, b)
}
(context.taskMetrics().outputMetrics, bytesWrittenCallback)
}

def maybeUpdateOutputMetrics(
outputMetricsAndBytesWrittenCallback: Option[(OutputMetrics, () => Long)],
outputMetrics: OutputMetrics,
callback: () => Long,
recordsWritten: Long): Unit = {
if (recordsWritten % RECORDS_BETWEEN_BYTES_WRITTEN_METRIC_UPDATES == 0) {
outputMetricsAndBytesWrittenCallback.foreach {
case (om, callback) =>
om.setBytesWritten(callback())
om.setRecordsWritten(recordsWritten)
}
outputMetrics.setBytesWritten(callback())
outputMetrics.setRecordsWritten(recordsWritten)
}
}

Expand Down
66 changes: 13 additions & 53 deletions core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,7 @@ import scala.collection.immutable.Map
import scala.reflect.ClassTag

import org.apache.hadoop.conf.{Configurable, Configuration}
import org.apache.hadoop.mapred.FileSplit
import org.apache.hadoop.mapred.InputFormat
import org.apache.hadoop.mapred.InputSplit
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapred.JobID
import org.apache.hadoop.mapred.RecordReader
import org.apache.hadoop.mapred.Reporter
import org.apache.hadoop.mapred.TaskAttemptID
import org.apache.hadoop.mapred.TaskID
import org.apache.hadoop.mapred._
import org.apache.hadoop.mapred.lib.CombineFileSplit
import org.apache.hadoop.mapreduce.TaskType
import org.apache.hadoop.util.ReflectionUtils
Expand All @@ -47,7 +39,7 @@ import org.apache.spark.internal.config.IGNORE_CORRUPT_FILES
import org.apache.spark.rdd.HadoopRDD.HadoopMapPartitionsWithSplitRDD
import org.apache.spark.scheduler.{HDFSCacheTaskLocation, HostTaskLocation}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.{NextIterator, SerializableConfiguration, ShutdownHookManager, Utils}
import org.apache.spark.util.{NextIterator, SerializableConfiguration, ShutdownHookManager}

/**
* A Spark split class that wraps around a Hadoop InputSplit.
Expand Down Expand Up @@ -229,11 +221,11 @@ class HadoopRDD[K, V](
// creating RecordReader, because RecordReader's constructor might read some bytes
private val getBytesReadCallback: Option[() => Long] = split.inputSplit.value match {
case _: FileSplit | _: CombineFileSplit =>
SparkHadoopUtil.get.getFSBytesReadOnThreadCallback()
Some(SparkHadoopUtil.get.getFSBytesReadOnThreadCallback())
case _ => None
}

// For Hadoop 2.5+, we get our input bytes from thread-local Hadoop FileSystem statistics.
// We get our input bytes from thread-local Hadoop FileSystem statistics.
// If we do a coalesce, however, we are likely to compute multiple partitions in the same
// task and in the same thread, in which case we need to avoid override values written by
// previous partitions (SPARK-13071).
Expand Down Expand Up @@ -280,13 +272,9 @@ class HadoopRDD[K, V](
(key, value)
}

override def close() {
override def close(): Unit = {
if (reader != null) {
InputFileBlockHolder.unset()
// Close the reader and release it. Note: it's very important that we don't close the
// reader more than once, since that exposes us to MAPREDUCE-5918 when running against
// Hadoop 1.x and older Hadoop 2.x releases. That bug can lead to non-deterministic
// corruption issues when reading compressed input.
try {
reader.close()
} catch {
Expand Down Expand Up @@ -326,18 +314,10 @@ class HadoopRDD[K, V](

override def getPreferredLocations(split: Partition): Seq[String] = {
val hsplit = split.asInstanceOf[HadoopPartition].inputSplit.value
val locs: Option[Seq[String]] = HadoopRDD.SPLIT_INFO_REFLECTIONS match {
case Some(c) =>
try {
val lsplit = c.inputSplitWithLocationInfo.cast(hsplit)
val infos = c.getLocationInfo.invoke(lsplit).asInstanceOf[Array[AnyRef]]
HadoopRDD.convertSplitLocationInfo(infos)
} catch {
case e: Exception =>
logDebug("Failed to use InputSplitWithLocations.", e)
None
}
case None => None
val locs = hsplit match {
case lsplit: InputSplitWithLocationInfo =>
HadoopRDD.convertSplitLocationInfo(lsplit.getLocationInfo)
case _ => None
}
locs.getOrElse(hsplit.getLocations.filter(_ != "localhost"))
}
Expand Down Expand Up @@ -413,32 +393,12 @@ private[spark] object HadoopRDD extends Logging {
}
}

private[spark] class SplitInfoReflections {
val inputSplitWithLocationInfo =
Utils.classForName("org.apache.hadoop.mapred.InputSplitWithLocationInfo")
val getLocationInfo = inputSplitWithLocationInfo.getMethod("getLocationInfo")
val newInputSplit = Utils.classForName("org.apache.hadoop.mapreduce.InputSplit")
val newGetLocationInfo = newInputSplit.getMethod("getLocationInfo")
val splitLocationInfo = Utils.classForName("org.apache.hadoop.mapred.SplitLocationInfo")
val isInMemory = splitLocationInfo.getMethod("isInMemory")
val getLocation = splitLocationInfo.getMethod("getLocation")
}

private[spark] val SPLIT_INFO_REFLECTIONS: Option[SplitInfoReflections] = try {
Some(new SplitInfoReflections)
} catch {
case e: Exception =>
logDebug("SplitLocationInfo and other new Hadoop classes are " +
"unavailable. Using the older Hadoop location info code.", e)
None
}

private[spark] def convertSplitLocationInfo(infos: Array[AnyRef]): Option[Seq[String]] = {
private[spark] def convertSplitLocationInfo(
infos: Array[SplitLocationInfo]): Option[Seq[String]] = {
Option(infos).map(_.flatMap { loc =>
val reflections = HadoopRDD.SPLIT_INFO_REFLECTIONS.get
val locationStr = reflections.getLocation.invoke(loc).asInstanceOf[String]
val locationStr = loc.getLocation
if (locationStr != "localhost") {
if (reflections.isInMemory.invoke(loc).asInstanceOf[Boolean]) {
if (loc.isInMemory) {
logDebug(s"Partition $locationStr is cached by Hadoop.")
Some(HDFSCacheTaskLocation(locationStr).toString)
} else {
Expand Down
23 changes: 4 additions & 19 deletions core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -152,11 +152,11 @@ class NewHadoopRDD[K, V](
private val getBytesReadCallback: Option[() => Long] =
split.serializableHadoopSplit.value match {
case _: FileSplit | _: CombineFileSplit =>
SparkHadoopUtil.get.getFSBytesReadOnThreadCallback()
Some(SparkHadoopUtil.get.getFSBytesReadOnThreadCallback())
case _ => None
}

// For Hadoop 2.5+, we get our input bytes from thread-local Hadoop FileSystem statistics.
// We get our input bytes from thread-local Hadoop FileSystem statistics.
// If we do a coalesce, however, we are likely to compute multiple partitions in the same
// task and in the same thread, in which case we need to avoid override values written by
// previous partitions (SPARK-13071).
Expand Down Expand Up @@ -231,13 +231,9 @@ class NewHadoopRDD[K, V](
(reader.getCurrentKey, reader.getCurrentValue)
}

private def close() {
private def close(): Unit = {
if (reader != null) {
InputFileBlockHolder.unset()
// Close the reader and release it. Note: it's very important that we don't close the
// reader more than once, since that exposes us to MAPREDUCE-5918 when running against
// Hadoop 1.x and older Hadoop 2.x releases. That bug can lead to non-deterministic
// corruption issues when reading compressed input.
try {
reader.close()
} catch {
Expand Down Expand Up @@ -277,18 +273,7 @@ class NewHadoopRDD[K, V](

override def getPreferredLocations(hsplit: Partition): Seq[String] = {
val split = hsplit.asInstanceOf[NewHadoopPartition].serializableHadoopSplit.value
val locs = HadoopRDD.SPLIT_INFO_REFLECTIONS match {
case Some(c) =>
try {
val infos = c.newGetLocationInfo.invoke(split).asInstanceOf[Array[AnyRef]]
HadoopRDD.convertSplitLocationInfo(infos)
} catch {
case e : Exception =>
logDebug("Failed to use InputSplit#getLocationInfo.", e)
None
}
case None => None
}
val locs = HadoopRDD.convertSplitLocationInfo(split.getLocationInfo)
locs.getOrElse(split.getLocations.filter(_ != "localhost"))
}

Expand Down
15 changes: 5 additions & 10 deletions core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,7 @@ import org.apache.spark._
import org.apache.spark.Partitioner.defaultPartitioner
import org.apache.spark.annotation.Experimental
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.executor.OutputMetrics
import org.apache.spark.internal.io.{FileCommitProtocol, HadoopMapReduceCommitProtocol, SparkHadoopMapReduceWriter, SparkHadoopWriterUtils}
import org.apache.spark.internal.io.{SparkHadoopMapReduceWriter, SparkHadoopWriterUtils}
import org.apache.spark.internal.Logging
import org.apache.spark.partial.{BoundedDouble, PartialResult}
import org.apache.spark.serializer.Serializer
Expand Down Expand Up @@ -1126,8 +1125,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
// around by taking a mod. We expect that no task will be attempted 2 billion times.
val taskAttemptId = (context.taskAttemptId % Int.MaxValue).toInt

val outputMetricsAndBytesWrittenCallback: Option[(OutputMetrics, () => Long)] =
SparkHadoopWriterUtils.initHadoopOutputMetrics(context)
val (outputMetrics, callback) = SparkHadoopWriterUtils.initHadoopOutputMetrics(context)

writer.setup(context.stageId, context.partitionId, taskAttemptId)
writer.open()
Expand All @@ -1139,16 +1137,13 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
writer.write(record._1.asInstanceOf[AnyRef], record._2.asInstanceOf[AnyRef])

// Update bytes written metric every few records
SparkHadoopWriterUtils.maybeUpdateOutputMetrics(
outputMetricsAndBytesWrittenCallback, recordsWritten)
SparkHadoopWriterUtils.maybeUpdateOutputMetrics(outputMetrics, callback, recordsWritten)
recordsWritten += 1
}
}(finallyBlock = writer.close())
writer.commit()
outputMetricsAndBytesWrittenCallback.foreach { case (om, callback) =>
om.setBytesWritten(callback())
om.setRecordsWritten(recordsWritten)
}
outputMetrics.setBytesWritten(callback())
outputMetrics.setRecordsWritten(recordsWritten)
}

self.context.runJob(self, writeToFile)
Expand Down
Loading

0 comments on commit e378f62

Please sign in to comment.