Skip to content

Commit

Permalink
[MLLIB] set RDD names in ALS
Browse files Browse the repository at this point in the history
This is very useful when debugging & fine tuning jobs with large data sets.

Author: Neville Li <[email protected]>

Closes #966 from nevillelyh/master and squashes the following commits:

6747764 [Neville Li] [MLLIB] use string interpolation for RDD names
3b15d34 [Neville Li] [MLLIB] set RDD names in ALS
  • Loading branch information
nevillelyh authored and mengxr committed Jun 4, 2014
1 parent c402a4a commit b8d2580
Showing 1 changed file with 11 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,10 @@ class ALS private (
val (userInLinks, userOutLinks) = makeLinkRDDs(numBlocks, ratingsByUserBlock, partitioner)
val (productInLinks, productOutLinks) =
makeLinkRDDs(numBlocks, ratingsByProductBlock, partitioner)
userInLinks.setName("userInLinks")
userOutLinks.setName("userOutLinks")
productInLinks.setName("productInLinks")
productOutLinks.setName("productOutLinks")

// Initialize user and product factors randomly, but use a deterministic seed for each
// partition so that fault recovery works
Expand All @@ -225,14 +229,14 @@ class ALS private (
// perform ALS update
logInfo("Re-computing I given U (Iteration %d/%d)".format(iter, iterations))
// Persist users because it will be called twice.
users.persist()
users.setName(s"users-$iter").persist()
val YtY = Some(sc.broadcast(computeYtY(users)))
val previousProducts = products
products = updateFeatures(users, userOutLinks, productInLinks, partitioner, rank, lambda,
alpha, YtY)
previousProducts.unpersist()
logInfo("Re-computing U given I (Iteration %d/%d)".format(iter, iterations))
products.persist()
products.setName(s"products-$iter").persist()
val XtX = Some(sc.broadcast(computeYtY(products)))
val previousUsers = users
users = updateFeatures(products, productOutLinks, userInLinks, partitioner, rank, lambda,
Expand All @@ -245,22 +249,24 @@ class ALS private (
logInfo("Re-computing I given U (Iteration %d/%d)".format(iter, iterations))
products = updateFeatures(users, userOutLinks, productInLinks, partitioner, rank, lambda,
alpha, YtY = None)
products.setName(s"products-$iter")
logInfo("Re-computing U given I (Iteration %d/%d)".format(iter, iterations))
users = updateFeatures(products, productOutLinks, userInLinks, partitioner, rank, lambda,
alpha, YtY = None)
users.setName(s"users-$iter")
}
}

// The last `products` will be used twice. One to generate the last `users` and the other to
// generate `productsOut`. So we cache it for better performance.
products.persist()
products.setName("products").persist()

// Flatten and cache the two final RDDs to un-block them
val usersOut = unblockFactors(users, userOutLinks)
val productsOut = unblockFactors(products, productOutLinks)

usersOut.persist()
productsOut.persist()
usersOut.setName("usersOut").persist()
productsOut.setName("productsOut").persist()

// Materialize usersOut and productsOut.
usersOut.count()
Expand Down

0 comments on commit b8d2580

Please sign in to comment.