Skip to content

Commit

Permalink
SPARK-1482: Fix potential resource leaks in saveAsHadoopDataset and s…
Browse files Browse the repository at this point in the history
…aveAsNewAPIHadoopDataset
  • Loading branch information
zsxwing committed Apr 13, 2014
1 parent ca11919 commit 06b197a
Showing 1 changed file with 18 additions and 12 deletions.
30 changes: 18 additions & 12 deletions core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -693,11 +693,15 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
val committer = format.getOutputCommitter(hadoopContext)
committer.setupTask(hadoopContext)
val writer = format.getRecordWriter(hadoopContext).asInstanceOf[NewRecordWriter[K,V]]
while (iter.hasNext) {
val (k, v) = iter.next()
writer.write(k, v)
try {
while (iter.hasNext) {
val (k, v) = iter.next()
writer.write(k, v)
}
}
finally {
writer.close(hadoopContext)
}
writer.close(hadoopContext)
committer.commitTask(hadoopContext)
return 1
}
Expand Down Expand Up @@ -750,15 +754,17 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])

writer.setup(context.stageId, context.partitionId, attemptNumber)
writer.open()

var count = 0
while(iter.hasNext) {
val record = iter.next()
count += 1
writer.write(record._1.asInstanceOf[AnyRef], record._2.asInstanceOf[AnyRef])
try {
var count = 0
while(iter.hasNext) {
val record = iter.next()
count += 1
writer.write(record._1.asInstanceOf[AnyRef], record._2.asInstanceOf[AnyRef])
}
}
finally {
writer.close()
}

writer.close()
writer.commit()
}

Expand Down

0 comments on commit 06b197a

Please sign in to comment.