From 8c042f4b855145d036a11f3bdf2db0e34af458b2 Mon Sep 17 00:00:00 2001 From: "Wu, Xiaochang" Date: Wed, 4 Aug 2021 17:18:43 +0800 Subject: [PATCH] Add isOAPEnabled for kmenas, pca & als for all versions --- .../main/scala/org/apache/spark/ml/clustering/KMeans.scala | 3 ++- .../main/scala/org/apache/spark/ml/feature/PCA.scala | 3 +-- .../main/scala/org/apache/spark/ml/recommendation/ALS.scala | 2 +- .../main/scala/org/apache/spark/ml/clustering/KMeans.scala | 3 ++- .../main/scala/org/apache/spark/ml/feature/PCA.scala | 3 +-- .../main/scala/org/apache/spark/ml/recommendation/ALS.scala | 2 +- .../main/scala/org/apache/spark/ml/clustering/KMeans.scala | 3 ++- .../main/scala/org/apache/spark/ml/feature/PCA.scala | 3 +-- .../main/scala/org/apache/spark/ml/recommendation/ALS.scala | 2 +- .../main/scala/org/apache/spark/ml/clustering/KMeans.scala | 3 ++- .../main/scala/org/apache/spark/ml/feature/PCA.scala | 3 +-- .../main/scala/org/apache/spark/ml/recommendation/ALS.scala | 2 +- 12 files changed, 16 insertions(+), 16 deletions(-) diff --git a/mllib-dal/src/spark-3.0.0/main/scala/org/apache/spark/ml/clustering/KMeans.scala b/mllib-dal/src/spark-3.0.0/main/scala/org/apache/spark/ml/clustering/KMeans.scala index dc3dbbd6e..ce0979e9d 100644 --- a/mllib-dal/src/spark-3.0.0/main/scala/org/apache/spark/ml/clustering/KMeans.scala +++ b/mllib-dal/src/spark-3.0.0/main/scala/org/apache/spark/ml/clustering/KMeans.scala @@ -354,7 +354,8 @@ class KMeans @Since("1.5.0") ( val isPlatformSupported = Utils.checkClusterPlatformCompatibility( dataset.sparkSession.sparkContext) - val useKMeansDAL = isPlatformSupported && $(distanceMeasure) == "euclidean" && !handleWeight + val useKMeansDAL = Utils.isOAPEnabled() && isPlatformSupported && + $(distanceMeasure) == "euclidean" && !handleWeight val model = if (useKMeansDAL) { trainWithDAL(instances, handlePersistence) diff --git a/mllib-dal/src/spark-3.0.0/main/scala/org/apache/spark/ml/feature/PCA.scala b/mllib-dal/src/spark-3.0.0/main/scala/org/apache/spark/ml/feature/PCA.scala index 14e9a2ce1..8ca5a9b5d 100644 --- a/mllib-dal/src/spark-3.0.0/main/scala/org/apache/spark/ml/feature/PCA.scala +++ b/mllib-dal/src/spark-3.0.0/main/scala/org/apache/spark/ml/feature/PCA.scala @@ -95,12 +95,11 @@ class PCA @Since("1.5.0") ( require($(k) <= numFeatures, s"source vector size $numFeatures must be no less than k=$k") - val sc = dataset.sparkSession.sparkContext val isPlatformSupported = Utils.checkClusterPlatformCompatibility( dataset.sparkSession.sparkContext) // Call oneDAL Correlation PCA implementation when numFeatures < 65535 and fall back otherwise - val parentModel = if (numFeatures < 65535 && isPlatformSupported) { + val parentModel = if (numFeatures < 65535 && Utils.isOAPEnabled() && isPlatformSupported) { val executor_num = Utils.sparkExecutorNum(dataset.sparkSession.sparkContext) val executor_cores = Utils.sparkExecutorCores() val pca = new PCADALImpl(k = $(k), executor_num, executor_cores) diff --git a/mllib-dal/src/spark-3.0.0/main/scala/org/apache/spark/ml/recommendation/ALS.scala b/mllib-dal/src/spark-3.0.0/main/scala/org/apache/spark/ml/recommendation/ALS.scala index e59c642c9..71195397b 100644 --- a/mllib-dal/src/spark-3.0.0/main/scala/org/apache/spark/ml/recommendation/ALS.scala +++ b/mllib-dal/src/spark-3.0.0/main/scala/org/apache/spark/ml/recommendation/ALS.scala @@ -922,7 +922,7 @@ object ALS extends DefaultParamsReadable[ALS] with Logging { val isPlatformSupported = DALImplUtils.checkClusterPlatformCompatibility(ratings.sparkContext) val (userIdAndFactors, itemIdAndFactors) = - if (implicitPrefs && isPlatformSupported) { + if (implicitPrefs && DALImplUtils.isOAPEnabled() && isPlatformSupported) { new ALSDALImpl(ratings, rank, maxIter, regParam, alpha, seed).train() } else { trainMLlib(ratings, rank, numUserBlocks, numItemBlocks, maxIter, regParam, implicitPrefs, diff --git a/mllib-dal/src/spark-3.0.1/main/scala/org/apache/spark/ml/clustering/KMeans.scala b/mllib-dal/src/spark-3.0.1/main/scala/org/apache/spark/ml/clustering/KMeans.scala index 7af0ffacf..51ea41fe8 100644 --- a/mllib-dal/src/spark-3.0.1/main/scala/org/apache/spark/ml/clustering/KMeans.scala +++ b/mllib-dal/src/spark-3.0.1/main/scala/org/apache/spark/ml/clustering/KMeans.scala @@ -349,7 +349,8 @@ class KMeans @Since("1.5.0") ( val isPlatformSupported = Utils.checkClusterPlatformCompatibility( dataset.sparkSession.sparkContext) - val useKMeansDAL = isPlatformSupported && $(distanceMeasure) == "euclidean" && !handleWeight + val useKMeansDAL = Utils.isOAPEnabled() && isPlatformSupported && + $(distanceMeasure) == "euclidean" && !handleWeight val model = if (useKMeansDAL) { trainWithDAL(instances, handlePersistence) diff --git a/mllib-dal/src/spark-3.0.1/main/scala/org/apache/spark/ml/feature/PCA.scala b/mllib-dal/src/spark-3.0.1/main/scala/org/apache/spark/ml/feature/PCA.scala index 14e9a2ce1..8ca5a9b5d 100644 --- a/mllib-dal/src/spark-3.0.1/main/scala/org/apache/spark/ml/feature/PCA.scala +++ b/mllib-dal/src/spark-3.0.1/main/scala/org/apache/spark/ml/feature/PCA.scala @@ -95,12 +95,11 @@ class PCA @Since("1.5.0") ( require($(k) <= numFeatures, s"source vector size $numFeatures must be no less than k=$k") - val sc = dataset.sparkSession.sparkContext val isPlatformSupported = Utils.checkClusterPlatformCompatibility( dataset.sparkSession.sparkContext) // Call oneDAL Correlation PCA implementation when numFeatures < 65535 and fall back otherwise - val parentModel = if (numFeatures < 65535 && isPlatformSupported) { + val parentModel = if (numFeatures < 65535 && Utils.isOAPEnabled() && isPlatformSupported) { val executor_num = Utils.sparkExecutorNum(dataset.sparkSession.sparkContext) val executor_cores = Utils.sparkExecutorCores() val pca = new PCADALImpl(k = $(k), executor_num, executor_cores) diff --git a/mllib-dal/src/spark-3.0.1/main/scala/org/apache/spark/ml/recommendation/ALS.scala b/mllib-dal/src/spark-3.0.1/main/scala/org/apache/spark/ml/recommendation/ALS.scala index e59c642c9..71195397b 100644 --- a/mllib-dal/src/spark-3.0.1/main/scala/org/apache/spark/ml/recommendation/ALS.scala +++ b/mllib-dal/src/spark-3.0.1/main/scala/org/apache/spark/ml/recommendation/ALS.scala @@ -922,7 +922,7 @@ object ALS extends DefaultParamsReadable[ALS] with Logging { val isPlatformSupported = DALImplUtils.checkClusterPlatformCompatibility(ratings.sparkContext) val (userIdAndFactors, itemIdAndFactors) = - if (implicitPrefs && isPlatformSupported) { + if (implicitPrefs && DALImplUtils.isOAPEnabled() && isPlatformSupported) { new ALSDALImpl(ratings, rank, maxIter, regParam, alpha, seed).train() } else { trainMLlib(ratings, rank, numUserBlocks, numItemBlocks, maxIter, regParam, implicitPrefs, diff --git a/mllib-dal/src/spark-3.0.2/main/scala/org/apache/spark/ml/clustering/KMeans.scala b/mllib-dal/src/spark-3.0.2/main/scala/org/apache/spark/ml/clustering/KMeans.scala index 54b406f3e..27df400a8 100644 --- a/mllib-dal/src/spark-3.0.2/main/scala/org/apache/spark/ml/clustering/KMeans.scala +++ b/mllib-dal/src/spark-3.0.2/main/scala/org/apache/spark/ml/clustering/KMeans.scala @@ -349,7 +349,8 @@ class KMeans @Since("1.5.0") ( val isPlatformSupported = Utils.checkClusterPlatformCompatibility( dataset.sparkSession.sparkContext) - val useKMeansDAL = isPlatformSupported && $(distanceMeasure) == "euclidean" && !handleWeight + val useKMeansDAL = Utils.isOAPEnabled() && isPlatformSupported && + $(distanceMeasure) == "euclidean" && !handleWeight val model = if (useKMeansDAL) { trainWithDAL(instances, handlePersistence) diff --git a/mllib-dal/src/spark-3.0.2/main/scala/org/apache/spark/ml/feature/PCA.scala b/mllib-dal/src/spark-3.0.2/main/scala/org/apache/spark/ml/feature/PCA.scala index 14e9a2ce1..8ca5a9b5d 100644 --- a/mllib-dal/src/spark-3.0.2/main/scala/org/apache/spark/ml/feature/PCA.scala +++ b/mllib-dal/src/spark-3.0.2/main/scala/org/apache/spark/ml/feature/PCA.scala @@ -95,12 +95,11 @@ class PCA @Since("1.5.0") ( require($(k) <= numFeatures, s"source vector size $numFeatures must be no less than k=$k") - val sc = dataset.sparkSession.sparkContext val isPlatformSupported = Utils.checkClusterPlatformCompatibility( dataset.sparkSession.sparkContext) // Call oneDAL Correlation PCA implementation when numFeatures < 65535 and fall back otherwise - val parentModel = if (numFeatures < 65535 && isPlatformSupported) { + val parentModel = if (numFeatures < 65535 && Utils.isOAPEnabled() && isPlatformSupported) { val executor_num = Utils.sparkExecutorNum(dataset.sparkSession.sparkContext) val executor_cores = Utils.sparkExecutorCores() val pca = new PCADALImpl(k = $(k), executor_num, executor_cores) diff --git a/mllib-dal/src/spark-3.0.2/main/scala/org/apache/spark/ml/recommendation/ALS.scala b/mllib-dal/src/spark-3.0.2/main/scala/org/apache/spark/ml/recommendation/ALS.scala index e59c642c9..71195397b 100644 --- a/mllib-dal/src/spark-3.0.2/main/scala/org/apache/spark/ml/recommendation/ALS.scala +++ b/mllib-dal/src/spark-3.0.2/main/scala/org/apache/spark/ml/recommendation/ALS.scala @@ -922,7 +922,7 @@ object ALS extends DefaultParamsReadable[ALS] with Logging { val isPlatformSupported = DALImplUtils.checkClusterPlatformCompatibility(ratings.sparkContext) val (userIdAndFactors, itemIdAndFactors) = - if (implicitPrefs && isPlatformSupported) { + if (implicitPrefs && DALImplUtils.isOAPEnabled() && isPlatformSupported) { new ALSDALImpl(ratings, rank, maxIter, regParam, alpha, seed).train() } else { trainMLlib(ratings, rank, numUserBlocks, numItemBlocks, maxIter, regParam, implicitPrefs, diff --git a/mllib-dal/src/spark-3.1.1/main/scala/org/apache/spark/ml/clustering/KMeans.scala b/mllib-dal/src/spark-3.1.1/main/scala/org/apache/spark/ml/clustering/KMeans.scala index 0878c146d..70f7c445f 100644 --- a/mllib-dal/src/spark-3.1.1/main/scala/org/apache/spark/ml/clustering/KMeans.scala +++ b/mllib-dal/src/spark-3.1.1/main/scala/org/apache/spark/ml/clustering/KMeans.scala @@ -348,7 +348,8 @@ class KMeans @Since("1.5.0") ( val isPlatformSupported = Utils.checkClusterPlatformCompatibility( dataset.sparkSession.sparkContext) - val useKMeansDAL = isPlatformSupported && $(distanceMeasure) == "euclidean" && !handleWeight + val useKMeansDAL = Utils.isOAPEnabled() && isPlatformSupported && + $(distanceMeasure) == "euclidean" && !handleWeight val model = if (useKMeansDAL) { trainWithDAL(instances, handlePersistence) diff --git a/mllib-dal/src/spark-3.1.1/main/scala/org/apache/spark/ml/feature/PCA.scala b/mllib-dal/src/spark-3.1.1/main/scala/org/apache/spark/ml/feature/PCA.scala index 14e9a2ce1..8ca5a9b5d 100644 --- a/mllib-dal/src/spark-3.1.1/main/scala/org/apache/spark/ml/feature/PCA.scala +++ b/mllib-dal/src/spark-3.1.1/main/scala/org/apache/spark/ml/feature/PCA.scala @@ -95,12 +95,11 @@ class PCA @Since("1.5.0") ( require($(k) <= numFeatures, s"source vector size $numFeatures must be no less than k=$k") - val sc = dataset.sparkSession.sparkContext val isPlatformSupported = Utils.checkClusterPlatformCompatibility( dataset.sparkSession.sparkContext) // Call oneDAL Correlation PCA implementation when numFeatures < 65535 and fall back otherwise - val parentModel = if (numFeatures < 65535 && isPlatformSupported) { + val parentModel = if (numFeatures < 65535 && Utils.isOAPEnabled() && isPlatformSupported) { val executor_num = Utils.sparkExecutorNum(dataset.sparkSession.sparkContext) val executor_cores = Utils.sparkExecutorCores() val pca = new PCADALImpl(k = $(k), executor_num, executor_cores) diff --git a/mllib-dal/src/spark-3.1.1/main/scala/org/apache/spark/ml/recommendation/ALS.scala b/mllib-dal/src/spark-3.1.1/main/scala/org/apache/spark/ml/recommendation/ALS.scala index e59c642c9..71195397b 100644 --- a/mllib-dal/src/spark-3.1.1/main/scala/org/apache/spark/ml/recommendation/ALS.scala +++ b/mllib-dal/src/spark-3.1.1/main/scala/org/apache/spark/ml/recommendation/ALS.scala @@ -922,7 +922,7 @@ object ALS extends DefaultParamsReadable[ALS] with Logging { val isPlatformSupported = DALImplUtils.checkClusterPlatformCompatibility(ratings.sparkContext) val (userIdAndFactors, itemIdAndFactors) = - if (implicitPrefs && isPlatformSupported) { + if (implicitPrefs && DALImplUtils.isOAPEnabled() && isPlatformSupported) { new ALSDALImpl(ratings, rank, maxIter, regParam, alpha, seed).train() } else { trainMLlib(ratings, rank, numUserBlocks, numItemBlocks, maxIter, regParam, implicitPrefs,