Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Streaming histogram implementation #152

Merged
merged 18 commits into from
Oct 16, 2018
Merged

Streaming histogram implementation #152

merged 18 commits into from
Oct 16, 2018

Conversation

marcovivero
Copy link
Contributor

@marcovivero marcovivero commented Oct 8, 2018

Related issues
This is an enhancement.

Describe the proposed solution
Add additional StreamingHistogram implementation as specified here:

http://www.jmlr.org/papers/volume11/ben-haim10a/ben-haim10a.pdf

Describe alternatives you've considered
This is an alternative to the current histogram binning strategy performed in FeatureDistribution, we do a comparison on density estimation given by the two binning methods.

@marcovivero marcovivero changed the title Mv/distribution simple Streaming histogram implementation Oct 8, 2018
@codecov
Copy link

codecov bot commented Oct 8, 2018

Codecov Report

Merging #152 into master will decrease coverage by 7.39%.
The diff coverage is 93.75%.

Impacted file tree graph

@@            Coverage Diff            @@
##           master     #152     +/-   ##
=========================================
- Coverage   85.74%   78.34%   -7.4%     
=========================================
  Files         302      303      +1     
  Lines        9881     9912     +31     
  Branches      334      522    +188     
=========================================
- Hits         8472     7766    -706     
- Misses       1409     2146    +737
Impacted Files Coverage Δ
...m/salesforce/op/utils/kryo/OpKryoRegistrator.scala 97.43% <100%> (+1.6%) ⬆️
...sforce/op/utils/stats/RichStreamingHistogram.scala 87.5% <87.5%> (ø)
...ala/com/salesforce/op/testkit/InfiniteStream.scala 0% <0%> (-100%) ⬇️
...alesforce/op/cli/gen/templates/SimpleProject.scala 0% <0%> (-100%) ⬇️
...ala/com/salesforce/op/testkit/RandomIntegral.scala 0% <0%> (-100%) ⬇️
...scala/com/salesforce/op/testkit/RandomStream.scala 0% <0%> (-100%) ⬇️
...n/scala/com/salesforce/op/testkit/RandomData.scala 0% <0%> (-100%) ⬇️
...com/salesforce/op/testkit/ProbabilityOfEmpty.scala 0% <0%> (-100%) ⬇️
...om/salesforce/op/local/OpWorkflowRunnerLocal.scala 0% <0%> (-100%) ⬇️
.../scala/com/salesforce/op/cli/gen/ProblemKind.scala 0% <0%> (-100%) ⬇️
... and 24 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update a35108b...9c377ee. Read the comment docs.

@@ -29,4 +29,7 @@ dependencies {
compile "joda-time:joda-time:$jodaTimeVersion"
compile "org.joda:joda-convert:$jodaConvertVersion"

// Breeze
compile 'org.scalanlp:breeze_2.11:0.13.2'
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"org.scalanlp:breeze_$scalaVersion:$breezeVersion"

}
case b if b.isEmpty => 0.0
case b =>
val (p, m) = b(0) // We must have exactly one element
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not have this if/else in the case?

}

private def updatePoints(updates: (Double, Double)*): Unit = {
updates.foreach { case (point, ct) =>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why mutable?!

val MeanAndVariance(equiDistEmpiricalCDFMSEMean, equiDistEmpiricalCDFMSEVar, _) =
meanAndVariance(mses.map(_.equiDistEmpiricalCDFMSE))

println("-" * 50)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

extract printing into a helper function in the spec + replace println with log.info

cdfTest("0.5 * Beta(5, 1) + 0.5 * Beta(1, 5)", mixture, grid, sampleSize, 400, 50)
}

private def cdfTest(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why there is so many private nested methods in this tests? it makes it very difficult to follow what actually the test does. can you please simplify if and use more of scalatest matchers syntax?


final public class HistogramJavaUtils {
final public static <T> TreeMap<Double, T> getTreeMap() {
return new TreeMap<Double, T>(getComparator());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you should be good to use TreeMap since it's already has a default serializer defined - https://github.com/EsotericSoftware/kryo/blob/kryo-parent-3.0.3/src/com/esotericsoftware/kryo/Kryo.java#L207

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just please verify it.

return new TreeMap<Double, T>(getComparator());
}

final private static Comparator<Double> getComparator() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Following upon our discussion: lets just define a concrete final comparator class

Copy link
Collaborator

@leahmcguire leahmcguire left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@marcovivero can you remind me why you couldnt use any existing implementations of streaming histograms?

@marcovivero
Copy link
Contributor Author

@leahmcguire I was primarily trying to avoid bringing in any additional dependencies into the project.

private[stats] def sum(bins: Array[Bin], x: Double): Double =
bins.sliding(2).foldLeft((0.0, EmptyBin, false)) { case ((s, _, done), arr) =>
arr match {
case Array((p, m)) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this case?

(newS, EmptyBin, true)

case Array((p1, m1), (p2, m2)) =>
if (x >= p1 && x < p2 && !done) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Slightly simplified?

if (done || x < p1) {
  (s, (p2, m2), true)
} else if (x >= p1 && x < p2) {
  val mx = m1 + ((m2 - m1) / (p2 - p1)) * (x - p1)
  val newS = s + (m1 / 2) + ((m1 + mx) / 2) * ((x - p1) / (p2 - p1))
  (newS, (p2, m2), true)
} else {
  (s + m1, (p2, m2), false)
}

final def empiricalCDF(x: Double): Double = StreamingHistogram.empiricalCDF(getBins, x)

private[this] def mergePoints(): Unit = if (points.size > math.max(maxBins, 2)) {
val (q1, q2) = points.descendingKeySet.descendingIterator.asScala.sliding(2).map(_ match {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Counts are not considered in points merging?

@leahmcguire
Copy link
Collaborator

small dependancies for tested and maintained code are better than reinventing the wheel.

@marcovivero
Copy link
Contributor Author

@leahmcguire: per @tovbinm's suggestion I grabbed out the code from Cassandra implementation and modified a bit for this use case.

@@ -67,6 +70,15 @@ class OpKryoRegistrator extends KryoRegistrator {
classOf[GenericData.Array[_]],
new GenericJavaCollectionSerializer(classOf[java.util.ArrayList[_]])
)

// Streaming histogram registration
doClassRegistration(kryo)(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, we should not be doing this for TreeMap and mutable.WrappedArray, as those should be already registered with Kryo.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This flag causes a failure if these aren't explicitly registered here:

https://github.com/salesforce/TransmogrifAI/pull/152/files#diff-36f3a0a7cf47675bc10aac982dab7fecR51

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is weird and to my understanding should not be the case. Let's find time to look at together.


import scala.collection.JavaConverters._

object HistogramUtils {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

docs please: for object and methods.

* Yael Ben-Haim and Elad Tom-Tov, "A Streaming Parallel Decision Tree Algorithm" (2010)
* http://jmlr.csail.mit.edu/papers/volume11/ben-haim10a/ben-haim10a.pdf
*
* This implementation is from Apache Cassandra roject, release 3.11.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

project

println("Equidistant histogram density MSE mean and variance: " +
s"${result.equiDistDensityMSE.mean}, ${result.equiDistDensityMSE.variance}")
println("Equidistant histogram cdf MSE mean and variance: " +
s"${result.equiDistCdfMSE.mean}, ${result.equiDistCdfMSE.variance}")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add some asserts and change these prints to logs that can be turned on or off

*/
package com.salesforce.op.utils.stats;

import java.io.IOException;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need to import java.io.IOException;

val gaussian2 = Gaussian(5, 5)
val mixture = MixtureDistribution(gaussian1, gaussian2, 0.8, sampleSize)

cdfTest("Gaussian(0, 1)", gaussian1, histogramSampleSize, 100, mcSampleSize, 50)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's not run every test 50 times. that is very slow. let's add this as val and maybe do just 3.

classOf[StreamingHistogram],
classOf[StreamingHistogramBuilder],
classOf[StreamingHistogramComparator],
classOf[TreeMap[_, _]],
Copy link
Collaborator

@tovbinm tovbinm Oct 12, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do this instead

import scala.collection.mutable.{WrappedArray => MWrappedArray}

// Streaming histogram registration

kryo.register(classOf[StreamingHistogram])
kryo.register(classOf[StreamingHistogramBuilder])
kryo.register(classOf[StreamingHistogramComparator])
kryo.register(classOf[TreeMap[_, _]], new TreeMapSerializer())

// Mutable wrapped arrays 
OpKryoClasses.WrappedArrays.foreach(kryo.register)

where

private[op] case object OpKryoClasses {
  
 val WrappedArrays: Seq[Class[_]] = Seq(
    MWrappedArray.make(Array[Boolean]()).getClass,
    MWrappedArray.make(Array[Byte]()).getClass,
    MWrappedArray.make(Array[Char]()).getClass,
    MWrappedArray.make(Array[Double]()).getClass,
    MWrappedArray.make(Array[Float]()).getClass,
    MWrappedArray.make(Array[Int]()).getClass,
    MWrappedArray.make(Array[Long]()).getClass,
    MWrappedArray.make(Array[Short]()).getClass,
    MWrappedArray.make(Array[String]()).getClass
  )
}

val gaussian = Gaussian(0, 1)(RandBasis.mt0)
val distributionName = "Gaussian(0, 1)"

val result75 = distributionTestResult(distributionName, gaussian, histogramSampleSize, 75, mcSampleSize, 5)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add val numResults = 5 in the head of the test.

Copy link
Collaborator

@tovbinm tovbinm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one minor comment about numResults, otherwise lgtm!

@@ -0,0 +1,299 @@
/*
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you need to add // scalastyle:off header.matches line here.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class won't go through scalastyle since in src/main/java so I don't think that's needed.

@marcovivero marcovivero merged commit 8264265 into salesforce:master Oct 16, 2018
ericwayman pushed a commit that referenced this pull request Feb 8, 2019
* Rebase adaptive histogram implementation

* Add test

* Outlier test

* Histogram final defs

* Add distribution tests

* Update distribution tests

* Use cassandra implementation

* Use correct Kryo serializers

* Clean up tests + address comments

* Address comments
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants