Skip to content

Commit

Permalink
bugfix
Browse files Browse the repository at this point in the history
  • Loading branch information
Davies Liu committed Oct 25, 2014
1 parent 32bd815 commit 1fd70df
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 5 deletions.
6 changes: 3 additions & 3 deletions core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -86,12 +86,12 @@ private[spark] object CheckpointRDD extends Logging {

def writeToFile[T: ClassTag](
path: String,
conf: Configuration,
conf: SerializableWritable[Configuration],
blockSize: Int = -1
)(ctx: TaskContext, iterator: Iterator[T]) {
val env = SparkEnv.get
val outputDir = new Path(path)
val fs = outputDir.getFileSystem(conf)
val fs = outputDir.getFileSystem(conf.value)

val finalOutputName = splitIdToFile(ctx.partitionId)
val finalOutputPath = new Path(outputDir, finalOutputName)
Expand Down Expand Up @@ -160,7 +160,7 @@ private[spark] object CheckpointRDD extends Logging {
val conf = SparkHadoopUtil.get.newConfiguration(new SparkConf())
val fs = path.getFileSystem(conf)
sc.runJob(rdd, CheckpointRDD.writeToFile[Int](
path.toString, conf, 1024) _)
path.toString, new SerializableWritable(conf), 1024) _)
val cpRDD = new CheckpointRDD[Int](sc, path.toString)
assert(cpRDD.partitions.length == rdd.partitions.length, "Number of partitions is not the same")
assert(cpRDD.collect.toList == rdd.collect.toList, "Data of partitions not the same")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,8 @@ private[spark] class RDDCheckpointData[T: ClassTag](@transient rdd: RDD[T])
}

// Save to file, and reload it as an RDD
rdd.context.runJob(rdd,
CheckpointRDD.writeToFile[T](path.toString, rdd.context.hadoopConfiguration) _)
val sConf = new SerializableWritable(rdd.context.hadoopConfiguration)
rdd.context.runJob(rdd, CheckpointRDD.writeToFile[T](path.toString, sConf) _)
val newRDD = new CheckpointRDD[T](rdd.context, path.toString)
if (newRDD.partitions.size != rdd.partitions.size) {
throw new SparkException(
Expand Down

0 comments on commit 1fd70df

Please sign in to comment.