Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-18702][SQL] input_file_block_start and input_file_block_length #16133

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 18 additions & 16 deletions core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -131,9 +131,9 @@ class HadoopRDD[K, V](
minPartitions)
}

protected val jobConfCacheKey = "rdd_%d_job_conf".format(id)
protected val jobConfCacheKey: String = "rdd_%d_job_conf".format(id)

protected val inputFormatCacheKey = "rdd_%d_input_format".format(id)
protected val inputFormatCacheKey: String = "rdd_%d_input_format".format(id)

// used to build JobTracker ID
private val createTime = new Date()
Expand Down Expand Up @@ -210,22 +210,24 @@ class HadoopRDD[K, V](
override def compute(theSplit: Partition, context: TaskContext): InterruptibleIterator[(K, V)] = {
val iter = new NextIterator[(K, V)] {

val split = theSplit.asInstanceOf[HadoopPartition]
private val split = theSplit.asInstanceOf[HadoopPartition]
logInfo("Input split: " + split.inputSplit)
val jobConf = getJobConf()
private val jobConf = getJobConf()

val inputMetrics = context.taskMetrics().inputMetrics
val existingBytesRead = inputMetrics.bytesRead
private val inputMetrics = context.taskMetrics().inputMetrics
private val existingBytesRead = inputMetrics.bytesRead

// Sets the thread local variable for the file's name
// Sets InputFileBlockHolder for the file block's information
split.inputSplit.value match {
case fs: FileSplit => InputFileNameHolder.setInputFileName(fs.getPath.toString)
case _ => InputFileNameHolder.unsetInputFileName()
case fs: FileSplit =>
InputFileBlockHolder.set(fs.getPath.toString, fs.getStart, fs.getLength)
case _ =>
InputFileBlockHolder.unset()
}

// Find a function that will return the FileSystem bytes read by this thread. Do this before
// creating RecordReader, because RecordReader's constructor might read some bytes
val getBytesReadCallback: Option[() => Long] = split.inputSplit.value match {
private val getBytesReadCallback: Option[() => Long] = split.inputSplit.value match {
case _: FileSplit | _: CombineFileSplit =>
SparkHadoopUtil.get.getFSBytesReadOnThreadCallback()
case _ => None
Expand All @@ -235,23 +237,23 @@ class HadoopRDD[K, V](
// If we do a coalesce, however, we are likely to compute multiple partitions in the same
// task and in the same thread, in which case we need to avoid override values written by
// previous partitions (SPARK-13071).
def updateBytesRead(): Unit = {
private def updateBytesRead(): Unit = {
getBytesReadCallback.foreach { getBytesRead =>
inputMetrics.setBytesRead(existingBytesRead + getBytesRead())
}
}

var reader: RecordReader[K, V] = null
val inputFormat = getInputFormat(jobConf)
private var reader: RecordReader[K, V] = null
private val inputFormat = getInputFormat(jobConf)
HadoopRDD.addLocalConfiguration(
new SimpleDateFormat("yyyyMMddHHmmss", Locale.US).format(createTime),
context.stageId, theSplit.index, context.attemptNumber, jobConf)
reader = inputFormat.getRecordReader(split.inputSplit.value, jobConf, Reporter.NULL)

// Register an on-task-completion callback to close the input stream.
context.addTaskCompletionListener{ context => closeIfNeeded() }
val key: K = reader.createKey()
val value: V = reader.createValue()
private val key: K = reader.createKey()
private val value: V = reader.createValue()

override def getNext(): (K, V) = {
try {
Expand All @@ -270,7 +272,7 @@ class HadoopRDD[K, V](

override def close() {
if (reader != null) {
InputFileNameHolder.unsetInputFileName()
InputFileBlockHolder.unset()
// Close the reader and release it. Note: it's very important that we don't close the
// reader more than once, since that exposes us to MAPREDUCE-5918 when running against
// Hadoop 1.x and older Hadoop 2.x releases. That bug can lead to non-deterministic
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.rdd

import org.apache.spark.unsafe.types.UTF8String

/**
* This holds file names of the current Spark task. This is used in HadoopRDD,
* FileScanRDD, NewHadoopRDD and InputFileName function in Spark SQL.
*/
private[spark] object InputFileBlockHolder {
/**
* A wrapper around some input file information.
*
* @param filePath path of the file read, or empty string if not available.
* @param startOffset starting offset, in bytes, or -1 if not available.
* @param length size of the block, in bytes, or -1 if not available.
*/
private class FileBlock(val filePath: UTF8String, val startOffset: Long, val length: Long) {
def this() {
this(UTF8String.fromString(""), -1, -1)
}
}

/**
* The thread variable for the name of the current file being read. This is used by
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this comment can be updated too.

* the InputFileName function in Spark SQL.
*/
private[this] val inputBlock: ThreadLocal[FileBlock] = new ThreadLocal[FileBlock] {
override protected def initialValue(): FileBlock = new FileBlock
}

/**
* Returns the holding file name or empty string if it is unknown.
*/
def getInputFilePath: UTF8String = inputBlock.get().filePath

/**
* Returns the starting offset of the block currently being read, or -1 if it is unknown.
*/
def getStartOffset: Long = inputBlock.get().startOffset

/**
* Returns the length of the block being read, or -1 if it is unknown.
*/
def getLength: Long = inputBlock.get().length

/**
* Sets the thread-local input block.
*/
def set(filePath: String, startOffset: Long, length: Long): Unit = {
require(filePath != null, "filePath cannot be null")
require(startOffset >= 0, s"startOffset ($startOffset) cannot be negative")
require(length >= 0, s"length ($length) cannot be negative")
inputBlock.set(new FileBlock(UTF8String.fromString(filePath), startOffset, length))
}

/**
* Clears the input file block to default value.
*/
def unset(): Unit = inputBlock.remove()
}
49 changes: 0 additions & 49 deletions core/src/main/scala/org/apache/spark/rdd/InputFileNameHolder.scala

This file was deleted.

43 changes: 23 additions & 20 deletions core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -132,54 +132,57 @@ class NewHadoopRDD[K, V](

override def compute(theSplit: Partition, context: TaskContext): InterruptibleIterator[(K, V)] = {
val iter = new Iterator[(K, V)] {
val split = theSplit.asInstanceOf[NewHadoopPartition]
private val split = theSplit.asInstanceOf[NewHadoopPartition]
logInfo("Input split: " + split.serializableHadoopSplit)
val conf = getConf
private val conf = getConf

val inputMetrics = context.taskMetrics().inputMetrics
val existingBytesRead = inputMetrics.bytesRead
private val inputMetrics = context.taskMetrics().inputMetrics
private val existingBytesRead = inputMetrics.bytesRead

// Sets the thread local variable for the file's name
// Sets InputFileBlockHolder for the file block's information
split.serializableHadoopSplit.value match {
case fs: FileSplit => InputFileNameHolder.setInputFileName(fs.getPath.toString)
case _ => InputFileNameHolder.unsetInputFileName()
case fs: FileSplit =>
InputFileBlockHolder.set(fs.getPath.toString, fs.getStart, fs.getLength)
case _ =>
InputFileBlockHolder.unset()
}

// Find a function that will return the FileSystem bytes read by this thread. Do this before
// creating RecordReader, because RecordReader's constructor might read some bytes
val getBytesReadCallback: Option[() => Long] = split.serializableHadoopSplit.value match {
case _: FileSplit | _: CombineFileSplit =>
SparkHadoopUtil.get.getFSBytesReadOnThreadCallback()
case _ => None
}
private val getBytesReadCallback: Option[() => Long] =
split.serializableHadoopSplit.value match {
case _: FileSplit | _: CombineFileSplit =>
SparkHadoopUtil.get.getFSBytesReadOnThreadCallback()
case _ => None
}

// For Hadoop 2.5+, we get our input bytes from thread-local Hadoop FileSystem statistics.
// If we do a coalesce, however, we are likely to compute multiple partitions in the same
// task and in the same thread, in which case we need to avoid override values written by
// previous partitions (SPARK-13071).
def updateBytesRead(): Unit = {
private def updateBytesRead(): Unit = {
getBytesReadCallback.foreach { getBytesRead =>
inputMetrics.setBytesRead(existingBytesRead + getBytesRead())
}
}

val format = inputFormatClass.newInstance
private val format = inputFormatClass.newInstance
format match {
case configurable: Configurable =>
configurable.setConf(conf)
case _ =>
}
val attemptId = new TaskAttemptID(jobTrackerId, id, TaskType.MAP, split.index, 0)
val hadoopAttemptContext = new TaskAttemptContextImpl(conf, attemptId)
private val attemptId = new TaskAttemptID(jobTrackerId, id, TaskType.MAP, split.index, 0)
private val hadoopAttemptContext = new TaskAttemptContextImpl(conf, attemptId)
private var reader = format.createRecordReader(
split.serializableHadoopSplit.value, hadoopAttemptContext)
reader.initialize(split.serializableHadoopSplit.value, hadoopAttemptContext)

// Register an on-task-completion callback to close the input stream.
context.addTaskCompletionListener(context => close())
var havePair = false
var finished = false
var recordsSinceMetricsUpdate = 0
private var havePair = false
private var finished = false
private var recordsSinceMetricsUpdate = 0

override def hasNext: Boolean = {
if (!finished && !havePair) {
Expand Down Expand Up @@ -215,7 +218,7 @@ class NewHadoopRDD[K, V](

private def close() {
if (reader != null) {
InputFileNameHolder.unsetInputFileName()
InputFileBlockHolder.unset()
// Close the reader and release it. Note: it's very important that we don't close the
// reader more than once, since that exposes us to MAPREDUCE-5918 when running against
// Hadoop 1.x and older Hadoop 2.x releases. That bug can lead to non-deterministic
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,8 @@ object FunctionRegistry {
expression[Sha2]("sha2"),
expression[SparkPartitionID]("spark_partition_id"),
expression[InputFileName]("input_file_name"),
expression[InputFileBlockStart]("input_file_block_start"),
expression[InputFileBlockLength]("input_file_block_length"),
expression[MonotonicallyIncreasingID]("monotonically_increasing_id"),
expression[CurrentDatabase]("current_database"),
expression[CallMethodViaReflection]("reflect"),
Expand Down

This file was deleted.

Loading