Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Return scoring feature distributions from RawFeatureFilter #171

Merged
merged 14 commits into from
Nov 3, 2018
34 changes: 18 additions & 16 deletions core/src/main/scala/com/salesforce/op/OpWorkflow.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
package com.salesforce.op

import com.salesforce.op.features.OPFeature
import com.salesforce.op.filters.{FeatureDistribution, RawFeatureFilter, Summary}
import com.salesforce.op.filters.{FeatureDistribution, FilteredRawData, RawFeatureFilter, Summary}
import com.salesforce.op.readers.Reader
import com.salesforce.op.stages.OPStage
import com.salesforce.op.stages.impl.feature.TimePeriod
Expand Down Expand Up @@ -129,13 +129,13 @@ class OpWorkflow(val uid: String = UID[OpWorkflow]) extends OpWorkflowCore {
initialStages.foreach { stg =>
val inFeatures = stg.getInputFeatures()
val blacklistRemoved = inFeatures
.filterNot{ f => allBlacklisted.exists(bl => bl.sameOrigin(f)) }
.map{ f => if (f.isRaw) f.withDistributions(distributions.collect{ case d if d.name == f.name => d }) else f }
.filterNot { f => allBlacklisted.exists(bl => bl.sameOrigin(f)) }
.map { f =>
if (f.isRaw) f.withDistributions(distributions.collect { case d if d.name == f.name => d }) else f
}
val inputsChanged = blacklistRemoved.map{ f => allUpdated.find(u => u.sameOrigin(f)).getOrElse(f) }
val oldOutput = stg.getOutput()
Try{
stg.setInputFeatureArray(inputsChanged).setOutputFeatureName(oldOutput.name).getOutput()
} match {
Try(stg.setInputFeatureArray(inputsChanged).setOutputFeatureName(oldOutput.name).getOutput()) match {
case Success(out) => allUpdated += out
case Failure(e) =>
if (initialResultFeatures.contains(oldOutput)) throw new RuntimeException(
Expand Down Expand Up @@ -221,23 +221,25 @@ class OpWorkflow(val uid: String = UID[OpWorkflow]) extends OpWorkflowCore {
*/
protected def generateRawData()(implicit spark: SparkSession): DataFrame = {
(reader, rawFeatureFilter) match {
case (None, None) => throw new IllegalArgumentException("Data reader must be set either directly on the" +
" workflow or through the RawFeatureFilter")
case (None, None) => throw new IllegalArgumentException(
"Data reader must be set either directly on the workflow or through the RawFeatureFilter")
case (Some(r), None) =>
checkReadersAndFeatures()
r.generateDataFrame(rawFeatures, parameters).persist()
case (rd, Some(rf)) =>
rd match {
case None => setReader(rf.trainingReader)
case Some(r) => if (r != rf.trainingReader) log.warn("Workflow data reader and RawFeatureFilter training" +
" reader do not match! The RawFeatureFilter training reader will be used to generate the data for training")
case Some(r) => if (r != rf.trainingReader) log.warn(
"Workflow data reader and RawFeatureFilter training reader do not match! " +
"The RawFeatureFilter training reader will be used to generate the data for training")
}
checkReadersAndFeatures()
val filteredRawData = rf.generateFilteredRaw(rawFeatures, parameters)
setRawFeatureDistributions(filteredRawData.featureDistributions.toArray)
setBlacklist(filteredRawData.featuresToDrop, filteredRawData.featureDistributions)
setBlacklistMapKeys(filteredRawData.mapKeysToDrop)
filteredRawData.cleanedData
val FilteredRawData(cleanedData, featuresToDrop, mapKeysToDrop, featureDistributions) =
rf.generateFilteredRaw(rawFeatures, parameters)
setRawFeatureDistributions(featureDistributions.toArray)
setBlacklist(featuresToDrop, featureDistributions)
setBlacklistMapKeys(mapKeysToDrop)
cleanedData
}
}

Expand Down Expand Up @@ -537,7 +539,7 @@ class OpWorkflow(val uid: String = UID[OpWorkflow]) extends OpWorkflowCore {
rawFeatureFilter = Option {
new RawFeatureFilter(
trainingReader = training.get,
scoreReader = scoringReader,
scoringReader = scoringReader,
bins = bins,
minFill = minFillRate,
maxFillDifference = maxFillDifference,
Expand Down
18 changes: 16 additions & 2 deletions core/src/main/scala/com/salesforce/op/OpWorkflowCore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ package com.salesforce.op

import com.salesforce.op.utils.stages.FitStagesUtil._
import com.salesforce.op.utils.stages.FitStagesUtil
import com.salesforce.op.features.OPFeature
import com.salesforce.op.features.{FeatureDistributionType, OPFeature}
import com.salesforce.op.features.types.FeatureType
import com.salesforce.op.filters.FeatureDistribution
import com.salesforce.op.readers.{CustomReader, Reader, ReaderKey}
Expand Down Expand Up @@ -195,11 +195,25 @@ private[op] trait OpWorkflowCore {
final def getParameters(): OpParams = parameters

/**
* Get raw feature distribution information computed during raw feature filter
* Get raw feature distribution information computed on training and scoring data during raw feature filter
* @return sequence of feature distribution information
*/
final def getRawFeatureDistributions(): Array[FeatureDistribution] = rawFeatureDistributions

/**
* Get raw feature distribution information computed on training data during raw feature filter
* @return sequence of feature distribution information
*/
final def getRawTrainingFeatureDistributions(): Array[FeatureDistribution] =
rawFeatureDistributions.filter(_.`type` == FeatureDistributionType.Training)

/**
* Get raw feature distribution information computed on scoring data during raw feature filter
* @return sequence of feature distribution information
*/
final def getRawScoringFeatureDistributions(): Array[FeatureDistribution] =
rawFeatureDistributions.filter(_.`type` == FeatureDistributionType.Scoring)

/**
* Determine if any of the raw features do not have a matching reader
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,9 @@ package com.salesforce.op

import com.salesforce.op.OpWorkflowModelReadWriteShared.FieldNames._
import com.salesforce.op.features.{FeatureJsonHelper, OPFeature, TransientFeature}
import com.salesforce.op.stages.{OpPipelineStageReader, _}
import OpPipelineStageReadWriteShared._
import com.salesforce.op.filters.FeatureDistribution
import com.salesforce.op.utils.json.JsonUtils
import com.salesforce.op.stages.OpPipelineStageReadWriteShared._
import com.salesforce.op.stages.{OpPipelineStageReader, _}
import org.apache.spark.ml.util.MLReader
import org.json4s.JsonAST.{JArray, JNothing, JValue}
import org.json4s.jackson.JsonMethods.parse
Expand Down Expand Up @@ -158,7 +157,7 @@ class OpWorkflowModelReader(val workflow: OpWorkflow) extends MLReader[OpWorkflo
private def resolveRawFeatureDistributions(json: JValue): Try[Array[FeatureDistribution]] = {
if ((json \ RawFeatureDistributions.entryName) != JNothing) { // for backwards compatibility
val distString = (json \ RawFeatureDistributions.entryName).extract[String]
JsonUtils.fromString[Array[FeatureDistribution]](distString)
FeatureDistribution.fromJson(distString)
} else {
Success(Array.empty[FeatureDistribution])
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@
package com.salesforce.op

import com.salesforce.op.features.FeatureJsonHelper
import com.salesforce.op.filters.FeatureDistribution
import com.salesforce.op.stages.{OpPipelineStageBase, OpPipelineStageWriter}
import com.salesforce.op.utils.json.JsonUtils
import enumeratum._
import org.apache.hadoop.fs.Path
import org.apache.spark.ml.util.MLWriter
Expand Down Expand Up @@ -81,8 +81,7 @@ class OpWorkflowModelWriter(val model: OpWorkflowModel) extends MLWriter {
(FN.AllFeatures.entryName -> allFeaturesJArray) ~
(FN.Parameters.entryName -> model.parameters.toJson(pretty = false)) ~
(FN.TrainParameters.entryName -> model.trainingParams.toJson(pretty = false)) ~
(FN.RawFeatureDistributions.entryName -> JsonUtils.toJsonString(model.getRawFeatureDistributions(),
pretty = false))
(FN.RawFeatureDistributions.entryName -> FeatureDistribution.toJson(model.getRawFeatureDistributions()))
}

private def resultFeaturesJArray(): JArray =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,17 +34,20 @@ package com.salesforce.op.filters
* Contains all feature distribution summaries and null label-leakage correlations used to
* determine dropped features in [[RawFeatureFilter]].
*
* @param responseSummaries response summaries
* @param responseDistributions response distributions
* @param predictorSummaries predictor summaries
* @param responseSummaries response summaries
* @param responseDistributions response distributions
* @param predictorSummaries predictor summaries
* @param predictorDistributions predictor distributions
* @param correlationInfo null label-leakage correlation map
* 1st level keys correspond to response keys
* 2nd level keys correspond to predictor keys with values being null-label leakage corr. value
* @param correlationInfo null label-leakage correlation map
* 1st level keys correspond to response keys
* 2nd level keys correspond to predictor keys with values being
* null-label leakage corr. value
*/
private[op] case class AllFeatureInformation(
responseSummaries: Map[FeatureKey, Summary],
responseDistributions: Array[FeatureDistribution],
predictorSummaries: Map[FeatureKey, Summary],
predictorDistributions: Array[FeatureDistribution],
correlationInfo: Map[FeatureKey, Map[FeatureKey, Double]])
private[op] case class AllFeatureInformation
(
responseSummaries: Map[FeatureKey, Summary],
responseDistributions: Array[FeatureDistribution],
predictorSummaries: Map[FeatureKey, Summary],
predictorDistributions: Array[FeatureDistribution],
correlationInfo: Map[FeatureKey, Map[FeatureKey, Double]]
)
Original file line number Diff line number Diff line change
Expand Up @@ -30,23 +30,30 @@

package com.salesforce.op.filters

import com.salesforce.op.features.FeatureDistributionLike
import java.util.Objects

import com.salesforce.op.features.{FeatureDistributionLike, FeatureDistributionType}
import com.salesforce.op.stages.impl.feature.{HashAlgorithm, Inclusion, NumericBucketizer}
import com.salesforce.op.utils.json.EnumEntrySerializer
import com.twitter.algebird.Monoid._
import com.twitter.algebird.Operators._
import com.twitter.algebird.Semigroup
import org.apache.spark.mllib.feature.HashingTF
import org.json4s.jackson.Serialization
import org.json4s.{DefaultFormats, Formats}

import scala.util.Try

/**
* Class containing summary information for a feature
*
* @param name name of the feature
* @param key map key associated with distribution (when the feature is a map)
* @param count total count of feature seen
* @param nulls number of empties seen in feature
* @param name name of the feature
* @param key map key associated with distribution (when the feature is a map)
* @param count total count of feature seen
* @param nulls number of empties seen in feature
* @param distribution binned counts of feature values (hashed for strings, evenly spaced bins for numerics)
* @param summaryInfo either min and max number of tokens for text data,
* or splits used for bins for numeric data
* @param summaryInfo either min and max number of tokens for text data, or splits used for bins for numeric data
* @param `type` feature distribution type: training or scoring
*/
case class FeatureDistribution
(
Expand All @@ -55,7 +62,8 @@ case class FeatureDistribution
count: Long,
nulls: Long,
distribution: Array[Double],
summaryInfo: Array[Double]
summaryInfo: Array[Double],
`type`: FeatureDistributionType = FeatureDistributionType.Training
) extends FeatureDistributionLike {

/**
Expand All @@ -64,12 +72,17 @@ case class FeatureDistribution
def featureKey: FeatureKey = (name, key)

/**
* Check that feature distributions belong to the same feature and key.
* Check that feature distributions belong to the same feature, key and type.
*
* @param fd distribution to compare to
*/
def checkMatch(fd: FeatureDistribution): Unit =
require(name == fd.name && key == fd.key, "Name and key must match to compare or combine FeatureDistribution")
private def checkMatch(fd: FeatureDistribution): Unit = {
def check[T](field: String, v1: T, v2: T): Unit = require(v1 == v2,
s"$field must match to compare or combine feature distributions: $v1 != $v2"
)
check("Name", name, fd.name)
check("Key", key, fd.key)
}

/**
* Get fill rate of feature
Expand All @@ -89,7 +102,7 @@ case class FeatureDistribution
val combinedDist = distribution + fd.distribution
// summary info can be empty or min max if hist is empty but should otherwise match so take the longest info
val combinedSummary = if (summaryInfo.length > fd.summaryInfo.length) summaryInfo else fd.summaryInfo
FeatureDistribution(name, key, count + fd.count, nulls + fd.nulls, combinedDist, combinedSummary)
FeatureDistribution(name, key, count + fd.count, nulls + fd.nulls, combinedDist, combinedSummary, `type`)
}

/**
Expand Down Expand Up @@ -135,57 +148,100 @@ case class FeatureDistribution
}

override def toString(): String = {
s"Name=$name, Key=$key, Count=$count, Nulls=$nulls, Histogram=${distribution.toList}, BinInfo=${summaryInfo.toList}"
val valStr = Seq(
"type" -> `type`.toString,
"name" -> name,
"key" -> key,
"count" -> count.toString,
"nulls" -> nulls.toString,
"distribution" -> distribution.mkString("[", ",", "]"),
"summaryInfo" -> summaryInfo.mkString("[", ",", "]")
).map { case (n, v) => s"$n = $v" }.mkString(", ")

s"${getClass.getSimpleName}($valStr)"
}

override def equals(that: Any): Boolean = that match {
case FeatureDistribution(`name`, `key`, `count`, `nulls`, d, s, `type`) =>
distribution.deep == d.deep && summaryInfo.deep == s.deep
case _ => false
}

override def hashCode(): Int = Objects.hashCode(name, key, count, nulls, distribution, summaryInfo, `type`)
}

private[op] object FeatureDistribution {
object FeatureDistribution {

val MaxBins = 100000

implicit val semigroup: Semigroup[FeatureDistribution] = new Semigroup[FeatureDistribution] {
override def plus(l: FeatureDistribution, r: FeatureDistribution) = l.reduce(r)
override def plus(l: FeatureDistribution, r: FeatureDistribution): FeatureDistribution = l.reduce(r)
}

implicit val formats: Formats = DefaultFormats +
EnumEntrySerializer.json4s[FeatureDistributionType](FeatureDistributionType)
tovbinm marked this conversation as resolved.
Show resolved Hide resolved

/**
* Feature distributions to json
*
* @param fd feature distributions
* @return json array
*/
def toJson(fd: Array[FeatureDistribution]): String = Serialization.write[Array[FeatureDistribution]](fd)

/**
* Feature distributions from json
*
* @param json feature distributions json
* @return feature distributions array
*/
def fromJson(json: String): Try[Array[FeatureDistribution]] = Try {
Serialization.read[Array[FeatureDistribution]](json)
}

/**
* Facilitates feature distribution retrieval from computed feature summaries
*
* @param featureKey feature key
* @param summary feature summary
* @param value optional processed sequence
* @param bins number of histogram bins
* @param featureKey feature key
* @param summary feature summary
* @param value optional processed sequence
* @param bins number of histogram bins
* @param textBinsFormula formula to compute the text features bin size.
* Input arguments are [[Summary]] and number of bins to use in computing feature distributions
* (histograms for numerics, hashes for strings). Output is the bins for the text features.
* @return a pair consisting of response and predictor feature distributions (in this order)
* @param `type` feature distribution type: training or scoring
* @return feature distribution given the provided information
*/
def apply(
private[op] def fromSummary(
featureKey: FeatureKey,
summary: Summary,
value: Option[ProcessedSeq],
bins: Int,
textBinsFormula: (Summary, Int) => Int
textBinsFormula: (Summary, Int) => Int,
`type`: FeatureDistributionType
): FeatureDistribution = {
val (nullCount, (summaryInfo, distribution)): (Int, (Array[Double], Array[Double])) =
value.map(seq => 0 -> histValues(seq, summary, bins, textBinsFormula))
.getOrElse(1 -> (Array(summary.min, summary.max, summary.sum, summary.count) -> new Array[Double](bins)))
val (name, key) = featureKey
val (nullCount, (summaryInfo, distribution)) =
value.map(seq => 0L -> histValues(seq, summary, bins, textBinsFormula))
.getOrElse(1L -> (Array(summary.min, summary.max, summary.sum, summary.count) -> new Array[Double](bins)))

FeatureDistribution(
name = featureKey._1,
key = featureKey._2,
count = 1,
name = name,
key = key,
count = 1L,
nulls = nullCount,
summaryInfo = summaryInfo,
distribution = distribution)
distribution = distribution,
`type` = `type`
)
}

/**
* Function to put data into histogram of counts
*
* @param values values to bin
* @param summary summary info for feature (max, min, etc)
* @param bins number of bins to produce
* @param values values to bin
* @param summary summary info for feature (max, min, etc)
* @param bins number of bins to produce
* @param textBinsFormula formula to compute the text features bin size.
* Input arguments are [[Summary]] and number of bins to use in computing feature distributions
* (histograms for numerics, hashes for strings). Output is the bins for the text features.
Expand Down
Loading