Skip to content

Commit

Permalink
move configuration for jobConf to HadoopRDD
Browse files Browse the repository at this point in the history
  • Loading branch information
CodingCat committed Mar 24, 2014
1 parent b7bdfa5 commit 9bd1fe3
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 36 deletions.
20 changes: 7 additions & 13 deletions core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,19 @@
* limitations under the License.
*/

package org.apache.hadoop.mapred
package org.apache.spark

import java.io.IOException
import java.text.NumberFormat
import java.text.SimpleDateFormat
import java.util.Date

import org.apache.hadoop.mapred._
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.Path

import org.apache.spark.Logging
import org.apache.spark.SerializableWritable
import org.apache.spark.rdd.HadoopRDD


/**
* Internal helper class that saves an RDD using a Hadoop OutputFormat. This is only public
Expand Down Expand Up @@ -59,7 +60,7 @@ class SparkHadoopWriter(@transient jobConf: JobConf)

def preSetup() {
setIDs(0, 0, 0)
setConfParams()
HadoopRDD.addLocalConfiguration("", 0, 0, 0, conf.value)

val jCtxt = getJobContext()
getOutputCommitter().setupJob(jCtxt)
Expand All @@ -68,7 +69,8 @@ class SparkHadoopWriter(@transient jobConf: JobConf)

def setup(jobid: Int, splitid: Int, attemptid: Int) {
setIDs(jobid, splitid, attemptid)
setConfParams()
HadoopRDD.addLocalConfiguration(new SimpleDateFormat("yyyyMMddHHmm").format(now),
jobid, splitID, attemptID, conf.value)
}

def open() {
Expand Down Expand Up @@ -167,14 +169,6 @@ class SparkHadoopWriter(@transient jobConf: JobConf)
taID = new SerializableWritable[TaskAttemptID](
new TaskAttemptID(new TaskID(jID.value, true, splitID), attemptID))
}

private def setConfParams() {
conf.value.set("mapred.job.id", jID.value.toString)
conf.value.set("mapred.tip.id", taID.value.getTaskID.toString)
conf.value.set("mapred.task.id", taID.value.toString)
conf.value.setBoolean("mapred.task.is.map", true)
conf.value.setInt("mapred.task.partition", splitID)
}
}

private[apache]
Expand Down
45 changes: 26 additions & 19 deletions core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ import org.apache.spark.broadcast.Broadcast
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.util.NextIterator


/**
* A Spark split class that wraps around a Hadoop InputSplit.
*/
Expand Down Expand Up @@ -117,7 +116,7 @@ class HadoopRDD[K, V](

protected val inputFormatCacheKey = "rdd_%d_input_format".format(id)

// used to build JT ID
// used to build JobTracker ID
private val createTime = new Date()

// Returns a JobConf that will be used on slaves to obtain input splits for Hadoop reads.
Expand Down Expand Up @@ -175,28 +174,13 @@ class HadoopRDD[K, V](
override def compute(theSplit: Partition, context: TaskContext) = {
val iter = new NextIterator[(K, V)] {

private def localizeConfiguration(conf: JobConf) {
//generate job id
val stageId = context.stageId
val dummyJobTrackerID = new SimpleDateFormat("yyyyMMddHHmm").format(createTime)
val jobId = new JobID(dummyJobTrackerID, stageId)
val splitID = theSplit.index
val attemptId = (context.attemptId % Int.MaxValue).toInt
val taId = new TaskAttemptID(new TaskID(jobId, true, splitID), attemptId)

conf.set("mapred.tip.id", taId.getTaskID.toString)
conf.set("mapred.task.id", taId.toString)
conf.setBoolean("mapred.task.is.map", true)
conf.setInt("mapred.task.partition", splitID)
conf.set("mapred.job.id", jobId.toString)
}

val split = theSplit.asInstanceOf[HadoopPartition]
logInfo("Input split: " + split.inputSplit)
var reader: RecordReader[K, V] = null
val jobConf = getJobConf()
val inputFormat = getInputFormat(jobConf)
localizeConfiguration(jobConf)
HadoopRDD.addLocalConfiguration(new SimpleDateFormat("yyyyMMddHHmm").format(createTime),
context.stageId, theSplit.index, context.attemptId.toInt, jobConf)
reader = inputFormat.getRecordReader(split.inputSplit.value, jobConf, Reporter.NULL)

// Register an on-task-completion callback to close the input stream.
Expand Down Expand Up @@ -248,4 +232,27 @@ private[spark] object HadoopRDD {

def putCachedMetadata(key: String, value: Any) =
SparkEnv.get.hadoopJobMetadata.put(key, value)

/**
*
* @param jtId
* @param jobId
* @param splitId
* @param attemptId
* @param conf
*/
def addLocalConfiguration(jtId: String, jobId: Int, splitId: Int, attemptId: Int,
conf: JobConf) {
// generate job id
//val stageId = context.stageId
val jobID = new JobID(jtId, jobId)
//val attemptId = (attemptId % Int.MaxValue).toInt
val taId = new TaskAttemptID(new TaskID(jobID, true, splitId), attemptId)

conf.set("mapred.tip.id", taId.getTaskID.toString)
conf.set("mapred.task.id", taId.toString)
conf.setBoolean("mapred.task.is.map", true)
conf.setInt("mapred.task.partition", splitId)
conf.set("mapred.job.id", jobID.toString)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,13 @@ import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.io.SequenceFile.CompressionType
import org.apache.hadoop.io.compress.CompressionCodec
import org.apache.hadoop.mapred.{FileOutputCommitter, FileOutputFormat, JobConf, OutputFormat}
import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat, Job => NewAPIHadoopJob, RecordWriter => NewRecordWriter, SparkHadoopMapReduceUtil}
import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat, Job => NewAPIHadoopJob,
RecordWriter => NewRecordWriter, SparkHadoopMapReduceUtil}
import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat => NewFileOutputFormat}

// SparkHadoopWriter and SparkHadoopMapReduceUtil are actually source files defined in Spark.
import org.apache.hadoop.mapred.SparkHadoopWriter

import org.apache.spark._
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.SparkHadoopWriter
import org.apache.spark.Partitioner.defaultPartitioner
import org.apache.spark.SparkContext._
import org.apache.spark.partial.{BoundedDouble, PartialResult}
Expand Down

0 comments on commit 9bd1fe3

Please sign in to comment.