Skip to content

Commit

Permalink
Replace mapValues with immutable Map where applicable (#363)
Browse files Browse the repository at this point in the history
  • Loading branch information
tovbinm authored and leahmcguire committed Jul 11, 2019
1 parent 3e0ae2c commit a53decd
Show file tree
Hide file tree
Showing 6 changed files with 58 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -232,10 +232,10 @@ private[op] class OpMultiClassificationEvaluator
ThresholdMetrics(
topNs = topNs,
thresholds = thresholds,
correctCounts = agg.mapValues { case (cor, _) => cor.toSeq },
incorrectCounts = agg.mapValues { case (_, incor) => incor.toSeq },
noPredictionCounts = agg.mapValues { case (cor, incor) =>
(Array.fill(nThresholds)(nRows) + cor.map(-_) + incor.map(-_)).toSeq
correctCounts = agg.map { case (k, (cor, _)) => k -> cor.toSeq },
incorrectCounts = agg.map { case (k, (_, incor)) => k -> incor.toSeq },
noPredictionCounts = agg.map { case (k, (cor, incor)) =>
k -> (Array.fill(nThresholds)(nRows) + cor.map(-_) + incor.map(-_)).toSeq
}
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ private[filters] case class PreparedFeatures
* @return pair consisting of response and predictor summaries (in this order)
*/
def summaries: (Map[FeatureKey, Summary], Map[FeatureKey, Summary]) =
responses.mapValues(Summary(_)) -> predictors.mapValues(Summary(_))
responses.map { case (k, s) => k -> Summary(s) } -> predictors.map { case (k, s) => k -> Summary(s) }

/**
* Computes vector of size responseKeys.length + predictorKeys.length. The first responses.length
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ class IsValidPhoneMapDefaultCountry(uid: String = UID[IsValidPhoneMapDefaultCoun

phoneNumberMap.value
.mapValues(p => PhoneNumberParser.validate(p.toPhone, region, isStrict))
.collect{ case(k, v) if !v.isEmpty => k -> v.value.get }.toBinaryMap
.collect { case (k, SomeValue(Some(b))) => k -> b }.toBinaryMap
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,5 +53,5 @@ class TimePeriodMapTransformer[I <: DateMap]
) extends UnaryTransformer[I, IntegralMap](operationName = "dateMapToTimePeriod", uid = uid) {

override def transformFn: I => IntegralMap =
(i: I) => i.value.mapValues(t => period.extractIntFromMillis(t).toLong).toIntegralMap
(i: I) => i.value.map { case (k, t) => k -> period.extractIntFromMillis(t).toLong }.toIntegralMap
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ package com.salesforce.op.filters

import com.salesforce.op.features.types._
import com.salesforce.op.features.{FeatureBuilder, OPFeature, TransientFeature}
import com.salesforce.op.filters.Summary._
import com.salesforce.op.stages.impl.feature.TimePeriod
import com.salesforce.op.stages.impl.preparators.CorrelationType
import com.salesforce.op.test.{Passenger, PassengerSparkFixtureTest}
Expand All @@ -42,38 +43,13 @@ import org.apache.spark.sql.DataFrame
import org.junit.runner.RunWith
import org.scalatest.FlatSpec
import org.scalatest.junit.JUnitRunner
import com.salesforce.op.filters.Summary._

import scala.util.{Failure, Success, Try}

@RunWith(classOf[JUnitRunner])
class PreparedFeaturesTest extends FlatSpec with PassengerSparkFixtureTest {

val responseKey1: FeatureKey = "Response1" -> None
val responseKey2: FeatureKey = "Response2" -> None
val predictorKey1: FeatureKey = "Predictor1" -> None
val predictorKey2A: FeatureKey = "Predictor2" -> Option("A")
val predictorKey2B: FeatureKey = "Predictor2" -> Option("B")

val preparedFeatures1 = PreparedFeatures(
responses = Map(responseKey1 -> Right(Seq(1.0)), responseKey2 -> Right(Seq(0.5))),
predictors = Map(
predictorKey1 -> Right(Seq(0.0, 0.0)),
predictorKey2A -> Left(Seq("i", "ii")),
predictorKey2B -> Left(Seq("iii"))))
val preparedFeatures2 = PreparedFeatures(
responses = Map(responseKey1 -> Right(Seq(0.0))),
predictors = Map(predictorKey1 -> Right(Seq(0.4, 0.5))))
val preparedFeatures3 = PreparedFeatures(
responses = Map(responseKey2 -> Right(Seq(-0.5))),
predictors = Map(predictorKey2A -> Left(Seq("iv"))))
val allPreparedFeatures = Seq(preparedFeatures1, preparedFeatures2, preparedFeatures3)
implicit val sgTuple2 = new Tuple2Semigroup[Map[FeatureKey, Summary], Map[FeatureKey, Summary]]()
val (allResponseSummaries, allPredictorSummaries) = allPreparedFeatures.map(_.summaries).reduce(_ + _)

val allResponseKeys1 = Array(responseKey1, responseKey2)
val allResponseKeys2 = Array(responseKey1)
val allPredictorKeys1 = Array(predictorKey1, predictorKey2A, predictorKey2B)
val allPredictorKeys2 = Array(predictorKey1)

import PreparedFeaturesTestData._

Spec[PreparedFeatures] should "produce correct summaries" in {
val (responseSummaries1, predictorSummaries1) = preparedFeatures1.summaries
Expand All @@ -100,6 +76,15 @@ class PreparedFeaturesTest extends FlatSpec with PassengerSparkFixtureTest {
predictorKey2B -> Summary(1.0, 1.0, 1.0, 1))
}

it should "produce summaries that are serializable" in {
Try(spark.sparkContext.makeRDD(allPreparedFeatures).map(_.summaries).reduce(_ + _)) match {
case Failure(error) => fail(error)
case Success((responses, predictors)) =>
responses shouldBe allResponseSummaries
predictors shouldBe allPredictorSummaries
}
}

it should "produce correct null-label leakage vector with single response" in {
preparedFeatures1.getNullLabelLeakageVector(allResponseKeys2, allPredictorKeys1).toArray shouldEqual
Array(1.0, 0.0, 0.0, 0.0)
Expand Down Expand Up @@ -218,3 +203,40 @@ class PreparedFeaturesTest extends FlatSpec with PassengerSparkFixtureTest {
}.toSeq should contain theSameElementsInOrderAs expectedResult
}
}

object PreparedFeaturesTestData {

val responseKey1: FeatureKey = "Response1" -> None
val responseKey2: FeatureKey = "Response2" -> None
val predictorKey1: FeatureKey = "Predictor1" -> None
val predictorKey2A: FeatureKey = "Predictor2" -> Option("A")
val predictorKey2B: FeatureKey = "Predictor2" -> Option("B")

val preparedFeatures1 = PreparedFeatures(
responses = Map(responseKey1 -> Right(Seq(1.0)), responseKey2 -> Right(Seq(0.5))),
predictors = Map(
predictorKey1 -> Right(Seq(0.0, 0.0)),
predictorKey2A -> Left(Seq("i", "ii")),
predictorKey2B -> Left(Seq("iii")))
)

val preparedFeatures2 = PreparedFeatures(
responses = Map(responseKey1 -> Right(Seq(0.0))),
predictors = Map(predictorKey1 -> Right(Seq(0.4, 0.5)))
)

val preparedFeatures3 = PreparedFeatures(
responses = Map(responseKey2 -> Right(Seq(-0.5))),
predictors = Map(predictorKey2A -> Left(Seq("iv")))
)

val allPreparedFeatures = Seq(preparedFeatures1, preparedFeatures2, preparedFeatures3)
implicit val sgTuple2 = new Tuple2Semigroup[Map[FeatureKey, Summary], Map[FeatureKey, Summary]]()
val (allResponseSummaries, allPredictorSummaries) = allPreparedFeatures.map(_.summaries).reduce(_ + _)

val allResponseKeys1 = Array(responseKey1, responseKey2)
val allResponseKeys2 = Array(responseKey1)
val allPredictorKeys1 = Array(predictorKey1, predictorKey2A, predictorKey2B)
val allPredictorKeys2 = Array(predictorKey1)

}
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ import com.twitter.algebird._
* However, order does not matter, so {a, a, b} and {a, b, a} are the same multiset.
*/
trait ExtendedMultiset extends MapMonoid[String, Long] with Group[Map[String, Long]] {
override def negate(kv: Map[String, Long]): Map[String, Long] = kv.mapValues { v => -v }
override def negate(kv: Map[String, Long]): Map[String, Long] = kv.map { case (k, v) => k -> -v }

override def minus(x: Map[String, Long], y: Map[String, Long]): Map[String, Long] = {
val keys = x.keySet ++ y.keySet
Expand Down

0 comments on commit a53decd

Please sign in to comment.