Skip to content

Commit

Permalink
[YSPARK-711] SparkSession DataFrameReader not showing byte/records Re…
Browse files Browse the repository at this point in the history
…ad task stats
  • Loading branch information
Thomas Graves committed Jun 28, 2017
1 parent 421c4f6 commit 5f3d0ee
Show file tree
Hide file tree
Showing 3 changed files with 6 additions and 8 deletions.
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ class HadoopRDD[K, V](
// creating RecordReader, because RecordReader's constructor might read some bytes
private val getBytesReadCallback: Option[() => Long] = split.inputSplit.value match {
case _: FileSplit | _: CombineFileSplit =>
SparkHadoopUtil.get.getFSBytesReadOnThreadCallback()
Some(SparkHadoopUtil.get.getFSBytesReadOnThreadCallback())
case _ => None
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ class NewHadoopRDD[K, V](
private val getBytesReadCallback: Option[() => Long] =
split.serializableHadoopSplit.value match {
case _: FileSplit | _: CombineFileSplit =>
SparkHadoopUtil.get.getFSBytesReadOnThreadCallback()
Some(SparkHadoopUtil.get.getFSBytesReadOnThreadCallback())
case _ => None
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,23 +74,21 @@ class FileScanRDD(

// Find a function that will return the FileSystem bytes read by this thread. Do this before
// apply readFunction, because it might read some bytes.
private val getBytesReadCallback: Option[() => Long] =
private val getBytesReadCallback =
SparkHadoopUtil.get.getFSBytesReadOnThreadCallback()

// For Hadoop 2.5+, we get our input bytes from thread-local Hadoop FileSystem statistics.
// We get our input bytes from thread-local Hadoop FileSystem statistics.
// If we do a coalesce, however, we are likely to compute multiple partitions in the same
// task and in the same thread, in which case we need to avoid override values written by
// previous partitions (SPARK-13071).
private def updateBytesRead(): Unit = {
getBytesReadCallback.foreach { getBytesRead =>
inputMetrics.setBytesRead(existingBytesRead + getBytesRead())
}
inputMetrics.setBytesRead(existingBytesRead + getBytesReadCallback())
}

// If we can't get the bytes read from the FS stats, fall back to the file size,
// which may be inaccurate.
private def updateBytesReadWithFileSize(): Unit = {
if (getBytesReadCallback.isEmpty && currentFile != null) {
if (currentFile != null) {
inputMetrics.incBytesRead(currentFile.length)
}
}
Expand Down

0 comments on commit 5f3d0ee

Please sign in to comment.