From c07d70c904146a1c84fd5a7ecd4ce185726d8119 Mon Sep 17 00:00:00 2001 From: Xiaochang Wu Date: Mon, 8 Mar 2021 15:46:44 +0800 Subject: [PATCH] [ML-33] Optimize oneCCL port detecting (#34) * Add missing build.sh * Add ALS with oneDAL backend * Add IntelALSSuite * fix shuffle_all2all func declare * Rename ALS rank to nFactors and name conflict with oneCCL rank * Fix test.sh * use repartition to workaround partition uneven * Use getifaddr instead of hostname -I * add synchronized to getAvailPort and use dataForConversion --- mllib-dal/src/main/native/OneCCL.cpp | 79 ++++++++++++++++--- .../javah/org_apache_spark_ml_util_OneCCL__.h | 4 +- .../spark/ml/clustering/KMeansDALImpl.scala | 15 ++-- .../apache/spark/ml/feature/PCADALImpl.scala | 8 +- .../spark/ml/recommendation/ALSDALImpl.scala | 19 ++--- .../org/apache/spark/ml/util/OneCCL.scala | 6 +- mllib-dal/test.sh | 6 +- 7 files changed, 93 insertions(+), 44 deletions(-) diff --git a/mllib-dal/src/main/native/OneCCL.cpp b/mllib-dal/src/main/native/OneCCL.cpp index 3927968e6..c733c7b33 100644 --- a/mllib-dal/src/main/native/OneCCL.cpp +++ b/mllib-dal/src/main/native/OneCCL.cpp @@ -6,6 +6,10 @@ #include #include +#include +#include +#include + #include #include "org_apache_spark_ml_util_OneCCL__.h" @@ -112,22 +116,71 @@ JNIEXPORT jint JNICALL Java_org_apache_spark_ml_util_OneCCL_00024_setEnv return err; } -#define GET_IP_CMD "hostname -I" -#define MAX_KVS_VAL_LENGTH 130 -#define READ_ONLY "r" +static const int CCL_IP_LEN = 128; +std::list local_host_ips; + +static int fill_local_host_ip() { + struct ifaddrs *ifaddr, *ifa; + int family = AF_UNSPEC; + char local_ip[CCL_IP_LEN]; + if (getifaddrs(&ifaddr) < 0) { + // LOG_ERROR("fill_local_host_ip: can not get host IP"); + return -1; + } + + const char iface_name[] = "lo"; + local_host_ips.clear(); + + for (ifa = ifaddr; ifa != NULL; ifa = ifa->ifa_next) { + if (ifa->ifa_addr == NULL) + continue; + if (strstr(ifa->ifa_name, iface_name) == NULL) { + family = ifa->ifa_addr->sa_family; + if (family == AF_INET) { + memset(local_ip, 0, CCL_IP_LEN); + int res = getnameinfo( + ifa->ifa_addr, + (family == AF_INET) ? sizeof(struct sockaddr_in) : sizeof(struct sockaddr_in6), + local_ip, + CCL_IP_LEN, + NULL, + 0, + NI_NUMERICHOST); + if (res != 0) { + std::string s("fill_local_host_ip: getnameinfo error > "); + s.append(gai_strerror(res)); + // LOG_ERROR(s.c_str()); + return -1; + } + local_host_ips.push_back(local_ip); + } + } + } + if (local_host_ips.empty()) { + // LOG_ERROR("fill_local_host_ip: can't find interface to get host IP"); + return -1; + } + // memset(local_host_ip, 0, CCL_IP_LEN); + // strncpy(local_host_ip, local_host_ips.front().c_str(), CCL_IP_LEN); + + // for (auto &ip : local_host_ips) + // cout << ip << endl; + + freeifaddrs(ifaddr); + return 0; +} static bool is_valid_ip(char ip[]) { - FILE *fp; - // TODO: use getifaddrs instead of popen - if ((fp = popen(GET_IP_CMD, READ_ONLY)) == NULL) { - printf("Can't get host IP\n"); - exit(1); + if (fill_local_host_ip() == -1) { + std::cerr << "fill_local_host_ip error" << std::endl; + }; + for (std::list::iterator it = local_host_ips.begin(); it != local_host_ips.end(); ++it) { + if (*it == ip) { + return true; } - char host_ips[MAX_KVS_VAL_LENGTH]; - fgets(host_ips, MAX_KVS_VAL_LENGTH, fp); - pclose(fp); + } - return strstr(host_ips, ip) ? true : false; + return false; } /* @@ -135,7 +188,7 @@ static bool is_valid_ip(char ip[]) { * Method: getAvailPort * Signature: (Ljava/lang/String;)I */ -JNIEXPORT jint JNICALL Java_org_apache_spark_ml_util_OneCCL_00024_getAvailPort +JNIEXPORT jint JNICALL Java_org_apache_spark_ml_util_OneCCL_00024_c_1getAvailPort (JNIEnv *env, jobject obj, jstring localIP) { // start from beginning of dynamic port diff --git a/mllib-dal/src/main/native/javah/org_apache_spark_ml_util_OneCCL__.h b/mllib-dal/src/main/native/javah/org_apache_spark_ml_util_OneCCL__.h index 52e6691ee..580c34bf9 100644 --- a/mllib-dal/src/main/native/javah/org_apache_spark_ml_util_OneCCL__.h +++ b/mllib-dal/src/main/native/javah/org_apache_spark_ml_util_OneCCL__.h @@ -49,10 +49,10 @@ JNIEXPORT jint JNICALL Java_org_apache_spark_ml_util_OneCCL_00024_setEnv /* * Class: org_apache_spark_ml_util_OneCCL__ - * Method: getAvailPort + * Method: c_getAvailPort * Signature: (Ljava/lang/String;)I */ -JNIEXPORT jint JNICALL Java_org_apache_spark_ml_util_OneCCL_00024_getAvailPort +JNIEXPORT jint JNICALL Java_org_apache_spark_ml_util_OneCCL_00024_c_1getAvailPort (JNIEnv *, jobject, jstring); #ifdef __cplusplus diff --git a/mllib-dal/src/main/scala/org/apache/spark/ml/clustering/KMeansDALImpl.scala b/mllib-dal/src/main/scala/org/apache/spark/ml/clustering/KMeansDALImpl.scala index f531b46a5..e9e7ec36d 100644 --- a/mllib-dal/src/main/scala/org/apache/spark/ml/clustering/KMeansDALImpl.scala +++ b/mllib-dal/src/main/scala/org/apache/spark/ml/clustering/KMeansDALImpl.scala @@ -40,14 +40,6 @@ class KMeansDALImpl ( instr.foreach(_.logInfo(s"Processing partitions with $executorNum executors")) - val executorIPAddress = Utils.sparkFirstExecutorIP(data.sparkContext) - val kvsIP = data.sparkContext.conf.get("spark.oap.mllib.oneccl.kvs.ip", executorIPAddress) - - val kvsPortDetected = Utils.checkExecutorAvailPort(data, kvsIP) - val kvsPort = data.sparkContext.conf.getInt("spark.oap.mllib.oneccl.kvs.port", kvsPortDetected) - - val kvsIPPort = kvsIP+"_"+kvsPort - // repartition to executorNum if not enough partitions val dataForConversion = if (data.getNumPartitions < executorNum) { data.repartition(executorNum).setName("Repartitioned for conversion").cache() @@ -55,6 +47,13 @@ class KMeansDALImpl ( data } + val executorIPAddress = Utils.sparkFirstExecutorIP(dataForConversion.sparkContext) + val kvsIP = dataForConversion.sparkContext.conf.get("spark.oap.mllib.oneccl.kvs.ip", executorIPAddress) + val kvsPortDetected = Utils.checkExecutorAvailPort(dataForConversion, kvsIP) + val kvsPort = dataForConversion.sparkContext.conf.getInt("spark.oap.mllib.oneccl.kvs.port", kvsPortDetected) + + val kvsIPPort = kvsIP+"_"+kvsPort + val partitionDims = Utils.getPartitionDims(dataForConversion) // filter the empty partitions diff --git a/mllib-dal/src/main/scala/org/apache/spark/ml/feature/PCADALImpl.scala b/mllib-dal/src/main/scala/org/apache/spark/ml/feature/PCADALImpl.scala index 1b3f9ddf0..e1bba3d37 100644 --- a/mllib-dal/src/main/scala/org/apache/spark/ml/feature/PCADALImpl.scala +++ b/mllib-dal/src/main/scala/org/apache/spark/ml/feature/PCADALImpl.scala @@ -47,11 +47,11 @@ class PCADALImpl ( val coalescedTables = OneDAL.rddVectorToNumericTables(normalizedData, executorNum) - val executorIPAddress = Utils.sparkFirstExecutorIP(data.sparkContext) - val kvsIP = data.sparkContext.conf.get("spark.oap.mllib.oneccl.kvs.ip", executorIPAddress) + val executorIPAddress = Utils.sparkFirstExecutorIP(coalescedTables.sparkContext) + val kvsIP = coalescedTables.sparkContext.conf.get("spark.oap.mllib.oneccl.kvs.ip", executorIPAddress) - val kvsPortDetected = Utils.checkExecutorAvailPort(data, kvsIP) - val kvsPort = data.sparkContext.conf.getInt("spark.oap.mllib.oneccl.kvs.port", kvsPortDetected) + val kvsPortDetected = Utils.checkExecutorAvailPort(coalescedTables, kvsIP) + val kvsPort = coalescedTables.sparkContext.conf.getInt("spark.oap.mllib.oneccl.kvs.port", kvsPortDetected) val kvsIPPort = kvsIP+"_"+kvsPort diff --git a/mllib-dal/src/main/scala/org/apache/spark/ml/recommendation/ALSDALImpl.scala b/mllib-dal/src/main/scala/org/apache/spark/ml/recommendation/ALSDALImpl.scala index 1e73b5308..bcb95ca1f 100644 --- a/mllib-dal/src/main/scala/org/apache/spark/ml/recommendation/ALSDALImpl.scala +++ b/mllib-dal/src/main/scala/org/apache/spark/ml/recommendation/ALSDALImpl.scala @@ -205,23 +205,16 @@ class ALSDALImpl[@specialized(Int, Long) ID: ClassTag]( logInfo(s"ALSDAL fit using $executorNum Executors for $nVectors vectors and $nFeatures features") - val executorIPAddress = Utils.sparkFirstExecutorIP(data.sparkContext) - val kvsIP = data.sparkContext.conf.get("spark.oap.mllib.oneccl.kvs.ip", executorIPAddress) + val numericTables = data.repartition(executorNum).setName("Repartitioned for conversion").cache() - val kvsPortDetected = Utils.checkExecutorAvailPort(data, kvsIP) - val kvsPort = data.sparkContext.conf.getInt("spark.oap.mllib.oneccl.kvs.port", kvsPortDetected) + val executorIPAddress = Utils.sparkFirstExecutorIP(numericTables.sparkContext) + val kvsIP = numericTables.sparkContext.conf.get("spark.oap.mllib.oneccl.kvs.ip", executorIPAddress) - val kvsIPPort = kvsIP+"_"+kvsPort + val kvsPortDetected = Utils.checkExecutorAvailPort(numericTables, kvsIP) + val kvsPort = numericTables.sparkContext.conf.getInt("spark.oap.mllib.oneccl.kvs.port", kvsPortDetected) - val numericTables = data.repartition(executorNum).setName("Repartitioned for conversion").cache() + val kvsIPPort = kvsIP+"_"+kvsPort -/* - val numericTables = if (data.getNumPartitions < executorNum) { - data.repartition(executorNum).setName("Repartitioned for conversion").cache() - } else { - data.coalesce(executorNum).setName("Coalesced for conversion").cache() - } -*/ val results = numericTables // Transpose the dataset .map { p => diff --git a/mllib-dal/src/main/scala/org/apache/spark/ml/util/OneCCL.scala b/mllib-dal/src/main/scala/org/apache/spark/ml/util/OneCCL.scala index 32b66a247..7581a1003 100644 --- a/mllib-dal/src/main/scala/org/apache/spark/ml/util/OneCCL.scala +++ b/mllib-dal/src/main/scala/org/apache/spark/ml/util/OneCCL.scala @@ -50,6 +50,10 @@ object OneCCL extends Logging { c_cleanup() } + def getAvailPort(localIP: String): Int = synchronized { + c_getAvailPort(localIP) + } + @native private def c_init(size: Int, rank: Int, ip_port: String, param: CCLParam) : Int @native private def c_cleanup() : Unit @@ -57,5 +61,5 @@ object OneCCL extends Logging { @native def rankID() : Int @native def setEnv(key: String, value: String, overwrite: Boolean = true): Int - @native def getAvailPort(localIP: String): Int + @native def c_getAvailPort(localIP: String): Int } \ No newline at end of file diff --git a/mllib-dal/test.sh b/mllib-dal/test.sh index b4ae95035..0157c22a4 100755 --- a/mllib-dal/test.sh +++ b/mllib-dal/test.sh @@ -35,9 +35,9 @@ export LD_PRELOAD=$JAVA_HOME/jre/lib/amd64/libjsig.so # -Dtest=none to turn off the Java tests # Test all -mvn -Dtest=none -Dmaven.test.skip=false test +# mvn -Dtest=none -Dmaven.test.skip=false test # Individual test -# mvn -Dtest=none -DwildcardSuites=org.apache.spark.ml.clustering.IntelKMeansSuite test -# mvn -Dtest=none -DwildcardSuites=org.apache.spark.ml.feature.IntelPCASuite test +mvn -Dtest=none -DwildcardSuites=org.apache.spark.ml.clustering.IntelKMeansSuite test +mvn -Dtest=none -DwildcardSuites=org.apache.spark.ml.feature.IntelPCASuite test # mvn -Dtest=none -DwildcardSuites=org.apache.spark.ml.recommendation.IntelALSSuite test