Skip to content

Commit

Permalink
[ML-13] Add ALS with new oneCCL APIs (#14)
Browse files Browse the repository at this point in the history
* Add missing build.sh

* Add ALS with oneDAL backend

* Add IntelALSSuite

* fix shuffle_all2all func declare

* Rename ALS rank to nFactors and name conflict with oneCCL rank

* Fix test.sh

* use repartition to workaround partition uneven
  • Loading branch information
xwu99 authored Mar 4, 2021
1 parent 8ecbc51 commit 3252ae9
Show file tree
Hide file tree
Showing 24 changed files with 4,912 additions and 4 deletions.
3 changes: 3 additions & 0 deletions examples/als-hibench/build.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
#!/usr/bin/env bash

mvn clean package
100 changes: 100 additions & 0 deletions examples/als-hibench/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.intel.oap</groupId>
<artifactId>oap-mllib-examples</artifactId>
<version>0.9.0-with-spark-3.0.0</version>
<packaging>jar</packaging>

<name>ALSHiBenchExample</name>
<url>https://github.com/Intel-bigdata/OAP</url>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<scala.version>2.12.10</scala.version>
<scala.binary.version>2.12</scala.binary.version>
<spark.version>3.0.0</spark.version>
</properties>

<dependencies>

<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.12.10</version>
</dependency>

<dependency>
<groupId>com.github.scopt</groupId>
<artifactId>scopt_2.12</artifactId>
<version>3.7.0</version>
</dependency>

<!--<dependency>-->
<!--<groupId>com.github.fommil.netlib</groupId>-->
<!--<artifactId>all</artifactId>-->
<!--<version>1.1.2</version> -->
<!--<type>pom</type>-->
<!--</dependency>-->

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_2.12</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>

</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<version>2.15.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
<configuration>
<scalaVersion>${scala.version}</scalaVersion>
<args>
<arg>-target:jvm-1.8</arg>
</args>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<configuration>
<appendAssemblyId>false</appendAssemblyId>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>

</project>
73 changes: 73 additions & 0 deletions examples/als-hibench/run-hibench-oap-mllib.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
#!/usr/bin/env bash

export HDFS_ROOT=hdfs://sr591:8020
export OAP_MLLIB_ROOT=/home/xiaochang/Works/OAP-xwu99-als/oap-mllib

SPARK_MASTER=yarn
SPARK_DRIVER_MEMORY=16G
SPARK_NUM_EXECUTORS=6
SPARK_EXECUTOR_CORES=28
SPARK_EXECUTOR_MEMORY_OVERHEAD=25G
SPARK_EXECUTOR_MEMORY=100G

SPARK_DEFAULT_PARALLELISM=$(expr $SPARK_NUM_EXECUTORS '*' $SPARK_EXECUTOR_CORES '*' 2)
#SPARK_DEFAULT_PARALLELISM=$(expr $SPARK_NUM_EXECUTORS '*' $SPARK_EXECUTOR_CORES)

# ======================================================= #

# for log suffix
SUFFIX=$( basename -s .sh "${BASH_SOURCE[0]}" )

# Check envs
if [[ -z $SPARK_HOME ]]; then
echo SPARK_HOME not defined!
exit 1
fi

if [[ -z $HADOOP_HOME ]]; then
echo HADOOP_HOME not defined!
exit 1
fi

export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop

# Target jar built
OAP_MLLIB_JAR_NAME=oap-mllib-0.9.0-with-spark-3.0.0.jar
OAP_MLLIB_JAR=$OAP_MLLIB_ROOT/mllib-dal/target/$OAP_MLLIB_JAR_NAME

# Use absolute path
SPARK_DRIVER_CLASSPATH=$OAP_MLLIB_JAR
# Use relative path
SPARK_EXECUTOR_CLASSPATH=./$OAP_MLLIB_JAR_NAME

APP_JAR=target/oap-mllib-examples-0.9.0-with-spark-3.0.0.jar
APP_CLASS=com.intel.hibench.sparkbench.ml.ALSExample

HDFS_INPUT=hdfs://sr591:8020/HiBench/ALS/Input
RANK=10
NUM_ITERATIONS=1
LAMBDA=0.1
IMPLICIT=true

/usr/bin/time -p $SPARK_HOME/bin/spark-submit --master $SPARK_MASTER -v \
--num-executors $SPARK_NUM_EXECUTORS \
--driver-memory $SPARK_DRIVER_MEMORY \
--executor-cores $SPARK_EXECUTOR_CORES \
--executor-memory $SPARK_EXECUTOR_MEMORY \
--conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" \
--conf "spark.default.parallelism=$SPARK_DEFAULT_PARALLELISM" \
--conf "spark.sql.shuffle.partitions=$SPARK_DEFAULT_PARALLELISM" \
--conf "spark.driver.extraClassPath=$SPARK_DRIVER_CLASSPATH" \
--conf "spark.executor.extraClassPath=$SPARK_EXECUTOR_CLASSPATH" \
--conf "spark.shuffle.reduceLocality.enabled=false" \
--conf "spark.executor.memoryOverhead=$SPARK_EXECUTOR_MEMORY_OVERHEAD" \
--conf "spark.network.timeout=1200s" \
--conf "spark.task.maxFailures=1" \
--jars $OAP_MLLIB_JAR \
--class $APP_CLASS \
$APP_JAR \
--rank $RANK --numIterations $NUM_ITERATIONS --implicitPrefs $IMPLICIT --lambda $LAMBDA \
--numProductBlocks $SPARK_DEFAULT_PARALLELISM --numUserBlocks $SPARK_DEFAULT_PARALLELISM \
$HDFS_INPUT \
2>&1 | tee ALS-$SUFFIX-$(date +%m%d_%H_%M_%S).log

61 changes: 61 additions & 0 deletions examples/als-hibench/run-hibench-vanilla.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
#!/usr/bin/env bash

export HDFS_ROOT=hdfs://sr591:8020

SPARK_MASTER=yarn
SPARK_DRIVER_MEMORY=16G
SPARK_NUM_EXECUTORS=6
SPARK_EXECUTOR_CORES=28
SPARK_EXECUTOR_MEMORY_OVERHEAD=25G
SPARK_EXECUTOR_MEMORY=100G

SPARK_DEFAULT_PARALLELISM=$(expr $SPARK_NUM_EXECUTORS '*' $SPARK_EXECUTOR_CORES '*' 2)

# ======================================================= #

# for log suffix
SUFFIX=$( basename -s .sh "${BASH_SOURCE[0]}" )

# Check envs
if [[ -z $SPARK_HOME ]]; then
echo SPARK_HOME not defined!
exit 1
fi

if [[ -z $HADOOP_HOME ]]; then
echo HADOOP_HOME not defined!
exit 1
fi

export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop

APP_JAR=target/oap-mllib-examples-0.9.0-with-spark-3.0.0.jar
APP_CLASS=com.intel.hibench.sparkbench.ml.ALSExample

HDFS_INPUT=hdfs://sr591:8020/HiBench/ALS/Input
RANK=10
NUM_ITERATIONS=1
LAMBDA=0.1
IMPLICIT=true

/usr/bin/time -p $SPARK_HOME/bin/spark-submit --master $SPARK_MASTER -v \
--num-executors $SPARK_NUM_EXECUTORS \
--driver-memory $SPARK_DRIVER_MEMORY \
--executor-cores $SPARK_EXECUTOR_CORES \
--executor-memory $SPARK_EXECUTOR_MEMORY \
--conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" \
--conf "spark.default.parallelism=$SPARK_DEFAULT_PARALLELISM" \
--conf "spark.sql.shuffle.partitions=$SPARK_DEFAULT_PARALLELISM" \
--conf "spark.driver.extraClassPath=$SPARK_DRIVER_CLASSPATH" \
--conf "spark.executor.extraClassPath=$SPARK_EXECUTOR_CLASSPATH" \
--conf "spark.shuffle.reduceLocality.enabled=false" \
--conf "spark.executor.memoryOverhead=$SPARK_EXECUTOR_MEMORY_OVERHEAD" \
--conf "spark.network.timeout=1200s" \
--conf "spark.task.maxFailures=1" \
--class $APP_CLASS \
$APP_JAR \
--rank $RANK --numIterations $NUM_ITERATIONS --implicitPrefs $IMPLICIT --lambda $LAMBDA \
--numProductBlocks $SPARK_DEFAULT_PARALLELISM --numUserBlocks $SPARK_DEFAULT_PARALLELISM \
$HDFS_INPUT \
2>&1 | tee ALS-$SUFFIX-$(date +%m%d_%H_%M_%S).log

Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.intel.hibench.sparkbench.ml

import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.ml.recommendation.ALS
import org.apache.spark.ml.recommendation.ALS.Rating
import org.apache.spark.sql.SparkSession
import scopt.OptionParser

object ALSExample {

case class Params(
dataPath: String = null,
numIterations: Int = 10,
lambda: Double = 0.1,
rank: Int = 10,
numUserBlocks: Int = 10,
numItemBlocks: Int = 10,
implicitPrefs: Boolean = false)

def main(args: Array[String]) {
val defaultParams = Params()

val parser = new OptionParser[Params]("ALS") {
head("ALS: an example app for ALS on User-Item data.")
opt[Int]("rank")
.text(s"rank, default: ${defaultParams.rank}")
.action((x, c) => c.copy(rank = x))
opt[Int]("numIterations")
.text(s"number of iterations, default: ${defaultParams.numIterations}")
.action((x, c) => c.copy(numIterations = x))
opt[Double]("lambda")
.text(s"regularization parameter, default: ${defaultParams.lambda}")
.action((x, c) => c.copy(lambda = x))
opt[Int]("numUserBlocks")
.text(s"number of user blocks, default: ${defaultParams.numUserBlocks}")
.action((x, c) => c.copy(numUserBlocks = x))
opt[Int]("numProductBlocks")
.text(s"number of product blocks, default: ${defaultParams.numItemBlocks}")
.action((x, c) => c.copy(numItemBlocks = x))
opt[Boolean]("implicitPrefs")
.text("implicit preference, default: ${defaultParams.implicitPrefs}")
.action((x, c) => c.copy(implicitPrefs = x))
arg[String]("<dataPath>")
.required()
.text("Input paths to a User-Product dataset of ratings")
.action((x, c) => c.copy(dataPath = x))
}
parser.parse(args, defaultParams) match {
case Some(params) => run(params)
case _ => sys.exit(1)
}
}

def run(params: Params): Unit = {
val spark = SparkSession
.builder
.appName(s"ALS with $params")
.getOrCreate()
val sc = spark.sparkContext

import spark.implicits._

val ratings = sc.objectFile[Rating[Int]](params.dataPath).toDF()

val Array(training, test) = ratings.randomSplit(Array(0.8, 0.2), 1L)

// Build the recommendation model using ALS on the training data
val als = new ALS()
.setRank(params.rank)
.setMaxIter(params.numIterations)
.setRegParam(params.lambda)
.setImplicitPrefs(params.implicitPrefs)
.setNumUserBlocks(params.numUserBlocks)
.setNumItemBlocks(params.numItemBlocks)
.setUserCol("user")
.setItemCol("item")
.setRatingCol("rating")
val model = als.fit(training)

// Evaluate the model by computing the RMSE on the test data
// Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics
model.setColdStartStrategy("drop")
val predictions = model.transform(test)

val evaluator = new RegressionEvaluator()
.setMetricName("rmse")
.setLabelCol("rating")
.setPredictionCol("prediction")
val rmse = evaluator.evaluate(predictions)
println(s"Root-mean-square error = $rmse")

spark.stop()
}
}
Loading

0 comments on commit 3252ae9

Please sign in to comment.