Skip to content

Commit

Permalink
Fixed issues after rebasing from master (after move from SchemaRDD to…
Browse files Browse the repository at this point in the history
… DataFrame)
  • Loading branch information
jkbradley committed Feb 5, 2015
1 parent 9872424 commit bcb9549
Show file tree
Hide file tree
Showing 10 changed files with 79 additions and 134 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ import org.apache.spark.annotation.{DeveloperApi, AlphaComponent}
import org.apache.spark.ml.impl.estimator.{PredictionModel, Predictor, PredictorParams}
import org.apache.spark.ml.param.{Params, ParamMap, HasRawPredictionCol}
import org.apache.spark.mllib.linalg.{Vector, VectorUDT}
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.analysis.Star
import org.apache.spark.sql.types.{DataType, StructType}
import org.apache.spark.sql.Dsl._
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.types.{DataType, DoubleType, StructType}


/**
Expand Down Expand Up @@ -95,7 +95,7 @@ abstract class ClassificationModel[FeaturesType, M <: ClassificationModel[Featur
* @param paramMap additional parameters, overwrite embedded params
* @return transformed dataset
*/
override def transform(dataset: SchemaRDD, paramMap: ParamMap): SchemaRDD = {
override def transform(dataset: DataFrame, paramMap: ParamMap): DataFrame = {
// This default implementation should be overridden as needed.

// Check schema
Expand Down Expand Up @@ -162,12 +162,9 @@ private[ml] object ClassificationModel {
* @return (number of columns added, transformed dataset)
*/
private[ml] def transformColumnsImpl[FeaturesType](
dataset: SchemaRDD,
dataset: DataFrame,
model: ClassificationModel[FeaturesType, _],
map: ParamMap): (Int, SchemaRDD) = {

import org.apache.spark.sql.catalyst.dsl._
import dataset.sqlContext._
map: ParamMap): (Int, DataFrame) = {

// Output selected columns only.
// This is a bit complicated since it tries to avoid repeated computation.
Expand All @@ -176,22 +173,25 @@ private[ml] object ClassificationModel {
if (map(model.rawPredictionCol) != "") {
// output raw prediction
val features2raw: FeaturesType => Vector = model.predictRaw
tmpData = tmpData.select(Star(None),
features2raw.call(map(model.featuresCol).attr) as map(model.rawPredictionCol))
tmpData = tmpData.select($"*",
callUDF(features2raw, new VectorUDT,
tmpData(map(model.featuresCol))).as(map(model.rawPredictionCol)))
numColsOutput += 1
if (map(model.predictionCol) != "") {
val raw2pred: Vector => Double = (rawPred) => {
rawPred.toArray.zipWithIndex.maxBy(_._1)._2
}
tmpData = tmpData.select(Star(None),
raw2pred.call(map(model.rawPredictionCol).attr) as map(model.predictionCol))
tmpData = tmpData.select($"*",
callUDF(raw2pred, DoubleType,
tmpData(map(model.rawPredictionCol))).as(map(model.predictionCol)))
numColsOutput += 1
}
} else if (map(model.predictionCol) != "") {
// output prediction
val features2pred: FeaturesType => Double = model.predict
tmpData = tmpData.select(Star(None),
features2pred.call(map(model.featuresCol).attr) as map(model.predictionCol))
tmpData = tmpData.select($"*",
callUDF(features2pred, DoubleType,
tmpData(map(model.featuresCol))).as(map(model.predictionCol)))
numColsOutput += 1
}
(numColsOutput, tmpData)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,10 @@ package org.apache.spark.ml.classification
import org.apache.spark.annotation.AlphaComponent
import org.apache.spark.ml.param._
import org.apache.spark.mllib.classification.LogisticRegressionWithLBFGS
import org.apache.spark.mllib.linalg.{BLAS, Vector, Vectors}
import org.apache.spark.sql._
import org.apache.spark.mllib.linalg.{VectorUDT, BLAS, Vector, Vectors}
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.Dsl._
import org.apache.spark.sql.types.{DoubleType, StructField, StructType}
import org.apache.spark.sql.catalyst.analysis.Star
import org.apache.spark.sql.catalyst.dsl._
import org.apache.spark.sql.types.DoubleType
import org.apache.spark.storage.StorageLevel


Expand Down Expand Up @@ -55,10 +53,10 @@ class LogisticRegression
def setMaxIter(value: Int): this.type = set(maxIter, value)
def setThreshold(value: Double): this.type = set(threshold, value)

override protected def train(dataset: SchemaRDD, paramMap: ParamMap): LogisticRegressionModel = {
override protected def train(dataset: DataFrame, paramMap: ParamMap): LogisticRegressionModel = {
// Extract columns from data. If dataset is persisted, do not persist oldDataset.
val oldDataset = extractLabeledPoints(dataset, paramMap)
val handlePersistence = dataset.getStorageLevel == StorageLevel.NONE
val handlePersistence = dataset.rdd.getStorageLevel == StorageLevel.NONE
if (handlePersistence) {
oldDataset.persist(StorageLevel.MEMORY_AND_DISK)
}
Expand Down Expand Up @@ -106,25 +104,10 @@ class LogisticRegressionModel private[ml] (
1.0 / (1.0 + math.exp(-m))
}

/*
override def transform(dataset: DataFrame, paramMap: ParamMap): DataFrame = {
transformSchema(dataset.schema, paramMap, logging = true)
val map = this.paramMap ++ paramMap
val scoreFunction = udf { v: Vector =>
val margin = BLAS.dot(v, weights)
1.0 / (1.0 + math.exp(-margin))
}
val t = map(threshold)
val predictFunction: Double => Double = (score) => { if (score > t) 1.0 else 0.0 }
dataset
.select($"*", callUDF(scoreFunction, col(map(featuresCol))).as(map(scoreCol)))
.select($"*", callUDF(predictFunction, col(map(scoreCol))).as(map(predictionCol)))
*/
override def transform(dataset: SchemaRDD, paramMap: ParamMap): SchemaRDD = {
// Check schema
transformSchema(dataset.schema, paramMap, logging = true)

import dataset.sqlContext._
val map = this.paramMap ++ paramMap

// Output selected columns only.
Expand All @@ -136,8 +119,8 @@ class LogisticRegressionModel private[ml] (
var numColsOutput = 0
if (map(rawPredictionCol) != "") {
val features2raw: Vector => Vector = predictRaw
tmpData = tmpData.select(Star(None),
features2raw.call(map(featuresCol).attr) as map(rawPredictionCol))
tmpData = tmpData.select($"*",
callUDF(features2raw, new VectorUDT, tmpData(map(featuresCol))).as(map(rawPredictionCol)))
numColsOutput += 1
}
if (map(probabilityCol) != "") {
Expand All @@ -146,12 +129,12 @@ class LogisticRegressionModel private[ml] (
val prob1 = 1.0 / (1.0 + math.exp(-rawPreds(1)))
Vectors.dense(1.0 - prob1, prob1)
}
tmpData = tmpData.select(Star(None),
raw2prob.call(map(rawPredictionCol).attr) as map(probabilityCol))
tmpData = tmpData.select($"*",
callUDF(raw2prob, new VectorUDT, tmpData(map(rawPredictionCol))).as(map(probabilityCol)))
} else {
val features2prob: Vector => Vector = predictProbabilities
tmpData = tmpData.select(Star(None),
features2prob.call(map(featuresCol).attr) as map(probabilityCol))
tmpData = tmpData.select($"*",
callUDF(features2prob, new VectorUDT, tmpData(map(featuresCol))).as(map(probabilityCol)))
}
numColsOutput += 1
}
Expand All @@ -161,19 +144,19 @@ class LogisticRegressionModel private[ml] (
val predict: Vector => Double = (probs) => {
if (probs(1) > t) 1.0 else 0.0
}
tmpData = tmpData.select(Star(None),
predict.call(map(probabilityCol).attr) as map(predictionCol))
tmpData = tmpData.select($"*",
callUDF(predict, DoubleType, tmpData(map(probabilityCol))).as(map(predictionCol)))
} else if (map(rawPredictionCol) != "") {
val predict: Vector => Double = (rawPreds) => {
val prob1 = 1.0 / (1.0 + math.exp(-rawPreds(1)))
if (prob1 > t) 1.0 else 0.0
}
tmpData = tmpData.select(Star(None),
predict.call(map(rawPredictionCol).attr) as map(predictionCol))
tmpData = tmpData.select($"*",
callUDF(predict, DoubleType, tmpData(map(rawPredictionCol))).as(map(predictionCol)))
} else {
val predict: Vector => Double = this.predict
tmpData = tmpData.select(Star(None),
predict.call(map(featuresCol).attr) as map(predictionCol))
tmpData = tmpData.select($"*",
callUDF(predict, DoubleType, tmpData(map(featuresCol))).as(map(predictionCol)))
}
numColsOutput += 1
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ package org.apache.spark.ml.classification
import org.apache.spark.annotation.{AlphaComponent, DeveloperApi}
import org.apache.spark.ml.param.{HasProbabilityCol, ParamMap, Params}
import org.apache.spark.mllib.linalg.{Vector, VectorUDT}
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.analysis.Star
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.Dsl._
import org.apache.spark.sql.types.{DataType, StructType}


Expand Down Expand Up @@ -91,10 +91,8 @@ abstract class ProbabilisticClassificationModel[
* @param paramMap additional parameters, overwrite embedded params
* @return transformed dataset
*/
override def transform(dataset: SchemaRDD, paramMap: ParamMap): SchemaRDD = {
override def transform(dataset: DataFrame, paramMap: ParamMap): DataFrame = {
// This default implementation should be overridden as needed.
import dataset.sqlContext._
import org.apache.spark.sql.catalyst.dsl._

// Check schema
transformSchema(dataset.schema, paramMap, logging = true)
Expand All @@ -118,8 +116,9 @@ abstract class ProbabilisticClassificationModel[
val features2probs: FeaturesType => Vector = (features) => {
tmpModel.predictProbabilities(features)
}
outputData.select(Star(None),
features2probs.call(map(featuresCol).attr) as map(probabilityCol))
outputData.select($"*",
callUDF(features2probs, new VectorUDT,
outputData(map(featuresCol))).as(map(probabilityCol)))
} else {
if (numColsOutput == 0) {
this.logWarning(s"$uid: ProbabilisticClassificationModel.transform() was called as NOOP" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@
package org.apache.spark.ml.evaluation

import org.apache.spark.annotation.AlphaComponent
import org.apache.spark.ml._
import org.apache.spark.ml.Evaluator
import org.apache.spark.ml.param._
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
import org.apache.spark.sql.{DataFrame, Row}
import org.apache.spark.mllib.linalg.{Vector, VectorUDT}
import org.apache.spark.sql.{DataFrame, Row}
import org.apache.spark.sql.types.DoubleType


Expand Down Expand Up @@ -52,7 +52,7 @@ class BinaryClassificationEvaluator extends Evaluator with Params
checkInputColumn(schema, map(labelCol), DoubleType)

// TODO: When dataset metadata has been implemented, check rawPredictionCol vector length = 2.
val scoreAndLabels = dataset.select(map(rawPredictionCol).attr, map(labelCol).attr)
val scoreAndLabels = dataset.select(map(rawPredictionCol), map(labelCol))
.map { case Row(rawPrediction: Vector, label: Double) =>
(rawPrediction(1), label)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ import org.apache.spark.ml.param._
import org.apache.spark.mllib.linalg.{VectorUDT, Vector}
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.analysis.Star
import org.apache.spark.sql.{DataFrame, Row}
import org.apache.spark.sql.Dsl._
import org.apache.spark.sql.types.{DataType, DoubleType, StructType}


Expand Down Expand Up @@ -85,7 +85,7 @@ abstract class Predictor[
def setFeaturesCol(value: String): Learner = set(featuresCol, value).asInstanceOf[Learner]
def setPredictionCol(value: String): Learner = set(predictionCol, value).asInstanceOf[Learner]

override def fit(dataset: SchemaRDD, paramMap: ParamMap): M = {
override def fit(dataset: DataFrame, paramMap: ParamMap): M = {
// This handles a few items such as schema validation.
// Developers only need to implement train().
transformSchema(dataset.schema, paramMap, logging = true)
Expand All @@ -108,7 +108,7 @@ abstract class Predictor[
* @return Fitted model
*/
@DeveloperApi
protected def train(dataset: SchemaRDD, paramMap: ParamMap): M
protected def train(dataset: DataFrame, paramMap: ParamMap): M

/**
* :: DeveloperApi ::
Expand All @@ -131,10 +131,9 @@ abstract class Predictor[
* Extract [[labelCol]] and [[featuresCol]] from the given dataset,
* and put it in an RDD with strong types.
*/
protected def extractLabeledPoints(dataset: SchemaRDD, paramMap: ParamMap): RDD[LabeledPoint] = {
import dataset.sqlContext._
protected def extractLabeledPoints(dataset: DataFrame, paramMap: ParamMap): RDD[LabeledPoint] = {
val map = this.paramMap ++ paramMap
dataset.select(map(labelCol).attr, map(featuresCol).attr)
dataset.select(map(labelCol), map(featuresCol))
.map { case Row(label: Double, features: Vector) =>
LabeledPoint(label, features)
}
Expand Down Expand Up @@ -184,10 +183,8 @@ abstract class PredictionModel[FeaturesType, M <: PredictionModel[FeaturesType,
* @param paramMap additional parameters, overwrite embedded params
* @return transformed dataset with [[predictionCol]] of type [[Double]]
*/
override def transform(dataset: SchemaRDD, paramMap: ParamMap): SchemaRDD = {
override def transform(dataset: DataFrame, paramMap: ParamMap): DataFrame = {
// This default implementation should be overridden as needed.
import org.apache.spark.sql.catalyst.dsl._
import dataset.sqlContext._

// Check schema
transformSchema(dataset.schema, paramMap, logging = true)
Expand All @@ -206,7 +203,8 @@ abstract class PredictionModel[FeaturesType, M <: PredictionModel[FeaturesType,
val pred: FeaturesType => Double = (features) => {
tmpModel.predict(features)
}
dataset.select(Star(None), pred.call(map(featuresCol).attr) as map(predictionCol))
dataset.select($"*",
callUDF(pred, DoubleType, dataset(map(featuresCol))).as(map(predictionCol)))
} else {
this.logWarning(s"$uid: Predictor.transform() was called as NOOP" +
" since no output columns were set.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import org.apache.spark.annotation.AlphaComponent
import org.apache.spark.ml.param.{Params, ParamMap, HasMaxIter, HasRegParam}
import org.apache.spark.mllib.linalg.{BLAS, Vector}
import org.apache.spark.mllib.regression.LinearRegressionWithSGD
import org.apache.spark.sql._
import org.apache.spark.sql.DataFrame
import org.apache.spark.storage.StorageLevel


Expand All @@ -47,10 +47,10 @@ class LinearRegression extends Regressor[Vector, LinearRegression, LinearRegress
def setRegParam(value: Double): this.type = set(regParam, value)
def setMaxIter(value: Int): this.type = set(maxIter, value)

override protected def train(dataset: SchemaRDD, paramMap: ParamMap): LinearRegressionModel = {
override protected def train(dataset: DataFrame, paramMap: ParamMap): LinearRegressionModel = {
// Extract columns from data. If dataset is persisted, do not persist oldDataset.
val oldDataset = extractLabeledPoints(dataset, paramMap)
val handlePersistence = dataset.getStorageLevel == StorageLevel.NONE
val handlePersistence = dataset.rdd.getStorageLevel == StorageLevel.NONE
if (handlePersistence) {
oldDataset.persist(StorageLevel.MEMORY_AND_DISK)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import java.io.Serializable;
import java.lang.Math;
import java.util.ArrayList;
import java.util.List;

import org.junit.After;
Expand All @@ -28,12 +27,11 @@

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.mllib.regression.LabeledPoint;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;
import static org.apache.spark.mllib.classification.LogisticRegressionSuite.generateLogisticInputAsList;
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.regression.LabeledPoint;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.Row;


Expand All @@ -50,11 +48,7 @@ public class JavaLogisticRegressionSuite implements Serializable {
public void setUp() {
jsc = new JavaSparkContext("local", "JavaLogisticRegressionSuite");
jsql = new SQLContext(jsc);
List<LabeledPoint> points = new ArrayList<LabeledPoint>();
for (org.apache.spark.mllib.regression.LabeledPoint lp:
generateLogisticInputAsList(1.0, 1.0, 100, 42)) {
points.add(new LabeledPoint(lp.label(), lp.features()));
}
List<LabeledPoint> points = generateLogisticInputAsList(1.0, 1.0, 100, 42);
datasetRDD = jsc.parallelize(points, 2);
dataset = jsql.applySchema(datasetRDD, LabeledPoint.class);
dataset.registerTempTable("dataset");
Expand Down Expand Up @@ -98,21 +92,14 @@ public void logisticRegressionWithSetters() {
// Modify model params, and check that the params worked.
model.setThreshold(1.0);
model.transform(dataset).registerTempTable("predAllZero");
SchemaRDD predAllZero = jsql.sql("SELECT prediction, myProbability FROM predAllZero");
DataFrame predAllZero = jsql.sql("SELECT prediction, myProbability FROM predAllZero");
for (Row r: predAllZero.collectAsList()) {
assert(r.getDouble(0) == 0.0);
}
// Call transform with params, and check that the params worked.
/* TODO: USE THIS
model.transform(dataset, model.threshold().w(0.8)) // overwrite threshold
.registerTempTable("prediction");
DataFrame predictions = jsql.sql("SELECT label, score, prediction FROM prediction");
predictions.collectAsList();
*/

model.transform(dataset, model.threshold().w(0.0), model.probabilityCol().w("myProb"))
.registerTempTable("predNotAllZero");
SchemaRDD predNotAllZero = jsql.sql("SELECT prediction, myProb FROM predNotAllZero");
DataFrame predNotAllZero = jsql.sql("SELECT prediction, myProb FROM predNotAllZero");
boolean foundNonZero = false;
for (Row r: predNotAllZero.collectAsList()) {
if (r.getDouble(0) != 0.0) foundNonZero = true;
Expand All @@ -137,7 +124,7 @@ public void logisticRegressionPredictorClassifierMethods() {
assert(model.numClasses() == 2);

model.transform(dataset).registerTempTable("transformed");
SchemaRDD trans1 = jsql.sql("SELECT rawPrediction, probability FROM transformed");
DataFrame trans1 = jsql.sql("SELECT rawPrediction, probability FROM transformed");
for (Row row: trans1.collect()) {
Vector raw = (Vector)row.get(0);
Vector prob = (Vector)row.get(1);
Expand All @@ -148,7 +135,7 @@ public void logisticRegressionPredictorClassifierMethods() {
assert(Math.abs(prob.apply(0) - (1.0 - probFromRaw1)) < eps);
}

SchemaRDD trans2 = jsql.sql("SELECT prediction, probability FROM transformed");
DataFrame trans2 = jsql.sql("SELECT prediction, probability FROM transformed");
for (Row row: trans2.collect()) {
double pred = row.getDouble(0);
Vector prob = (Vector)row.get(1);
Expand Down
Loading

0 comments on commit bcb9549

Please sign in to comment.