Skip to content

Commit

Permalink
[SPARK-12450][MLLIB] Un-persist broadcasted variables in KMeans
Browse files Browse the repository at this point in the history
SPARK-12450 . Un-persist broadcasted variables in KMeans.

Author: RJ Nowling <[email protected]>

Closes #10415 from rnowling/spark-12450.
  • Loading branch information
rnowling authored and jkbradley committed Jan 5, 2016
1 parent 1c6cf1a commit 78015a8
Showing 1 changed file with 8 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,8 @@ class KMeans private (
contribs.iterator
}.reduceByKey(mergeContribs).collectAsMap()

bcActiveCenters.unpersist(blocking = false)

// Update the cluster centers and costs for each active run
for ((run, i) <- activeRuns.zipWithIndex) {
var changed = false
Expand Down Expand Up @@ -419,7 +421,10 @@ class KMeans private (
s0
}
)

bcNewCenters.unpersist(blocking = false)
preCosts.unpersist(blocking = false)

val chosen = data.zip(costs).mapPartitionsWithIndex { (index, pointsWithCosts) =>
val rand = new XORShiftRandom(seed ^ (step << 16) ^ index)
pointsWithCosts.flatMap { case (p, c) =>
Expand Down Expand Up @@ -448,6 +453,9 @@ class KMeans private (
((r, KMeans.findClosest(bcCenters.value(r), p)._1), 1.0)
}
}.reduceByKey(_ + _).collectAsMap()

bcCenters.unpersist(blocking = false)

val finalCenters = (0 until runs).par.map { r =>
val myCenters = centers(r).toArray
val myWeights = (0 until myCenters.length).map(i => weightMap.getOrElse((r, i), 0.0)).toArray
Expand Down

0 comments on commit 78015a8

Please sign in to comment.