Skip to content

Commit

Permalink
[ML-184] Fix code style issues (#195)
Browse files Browse the repository at this point in the history
* check code style

Signed-off-by: minmingzhu <[email protected]>

* update cpp code style

Signed-off-by: minmingzhu <[email protected]>

* update scala code style

Signed-off-by: minmingzhu <[email protected]>

* update code style

Signed-off-by: minmingzhu <[email protected]>

* update code style

Signed-off-by: minmingzhu <[email protected]>

* update ALS code style

Signed-off-by: minmingzhu <[email protected]>

* update code style

Signed-off-by: minmingzhu <[email protected]>

* update style

Signed-off-by: minmingzhu <[email protected]>
  • Loading branch information
minmingzhu authored Mar 24, 2022
1 parent 594b9f7 commit d57cc82
Show file tree
Hide file tree
Showing 30 changed files with 589 additions and 1,139 deletions.
8 changes: 5 additions & 3 deletions mllib-dal/src/main/java/com/intel/oap/mllib/LibLoader.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,9 @@ public static String getTempSubDir() {
* Load all native libs
*/
public static synchronized void loadLibraries() throws IOException {
if (isLoaded)
if (isLoaded) {
return;
}

if (!loadLibSYCL()) {
log.debug("SYCL libraries are not available, will load CPU libraries only.");
Expand Down Expand Up @@ -80,7 +81,8 @@ private static synchronized void loadLibCCL() throws IOException {
*/
private static synchronized Boolean loadLibSYCL() throws IOException {
// Check if SYCL libraries are available
InputStream streamIn = LibLoader.class.getResourceAsStream(LIBRARY_PATH_IN_JAR + "/libsycl.so.5");
InputStream streamIn = LibLoader.class.getResourceAsStream(LIBRARY_PATH_IN_JAR +
"/libsycl.so.5");
if (streamIn == null) {
return false;
}
Expand Down Expand Up @@ -160,7 +162,7 @@ private static void loadFromJar(String path, String name) throws IOException {
streamIn.close();
}

System.load(fileOut.toString());
System.load(fileOut.toString());
log.debug("DONE: Loading library " + fileOut.toString() +" as resource.");
}

Expand Down
264 changes: 132 additions & 132 deletions mllib-dal/src/main/native/CorrelationDALImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,117 +31,118 @@ using namespace daal::algorithms;

typedef double algorithmFPType; /* Algorithm floating-point type */

static void correlation_compute(JNIEnv *env,
jobject obj,
int rankId,
ccl::communicator &comm,
const NumericTablePtr &pData,
size_t nBlocks,
jobject resultObj) {
using daal::byte;
auto t1 = std::chrono::high_resolution_clock::now();

const bool isRoot = (rankId == ccl_root);

covariance::Distributed<step1Local, algorithmFPType> localAlgorithm;

/* Set the input data set to the algorithm */
localAlgorithm.input.set(covariance::data, pData);

/* Compute covariance */
localAlgorithm.compute();

auto t2 = std::chrono::high_resolution_clock::now();
auto duration =
std::chrono::duration_cast<std::chrono::seconds>(t2 - t1).count();
std::cout << "Correleation (native): local step took " << duration << " secs"
<< std::endl;

t1 = std::chrono::high_resolution_clock::now();

/* Serialize partial results required by step 2 */
services::SharedPtr<byte> serializedData;
InputDataArchive dataArch;
localAlgorithm.getPartialResult()->serialize(dataArch);
size_t perNodeArchLength = dataArch.getSizeOfArchive();

serializedData =
services::SharedPtr<byte>(new byte[perNodeArchLength * nBlocks]);

byte *nodeResults = new byte[perNodeArchLength];
dataArch.copyArchiveToArray(nodeResults, perNodeArchLength);
std::vector<size_t> aReceiveCount(comm.size(),
perNodeArchLength); // 4 x "14016"

/* Transfer partial results to step 2 on the root node */
ccl::gather((int8_t *)nodeResults, perNodeArchLength,
(int8_t *)(serializedData.get()), perNodeArchLength, comm)
.wait();
t2 = std::chrono::high_resolution_clock::now();

duration =
std::chrono::duration_cast<std::chrono::seconds>(t2 - t1).count();
std::cout << "Correleation (native): ccl_allgatherv took " << duration << " secs"
<< std::endl;
if (isRoot) {
auto t1 = std::chrono::high_resolution_clock::now();
/* Create an algorithm to compute covariance on the master node */
covariance::Distributed<step2Master, algorithmFPType> masterAlgorithm;

for (size_t i = 0; i < nBlocks; i++) {
/* Deserialize partial results from step 1 */
OutputDataArchive dataArch(serializedData.get() +
perNodeArchLength * i,
perNodeArchLength);

covariance::PartialResultPtr dataForStep2FromStep1(new covariance::PartialResult());
dataForStep2FromStep1->deserialize(dataArch);

/* Set local partial results as input for the master-node algorithm
*/
masterAlgorithm.input.add(covariance::partialResults,
dataForStep2FromStep1);
}

/* Set the parameter to choose the type of the output matrix */
masterAlgorithm.parameter.outputMatrixType = covariance::correlationMatrix;

/* Merge and finalizeCompute covariance decomposition on the master node */
masterAlgorithm.compute();
masterAlgorithm.finalizeCompute();

/* Retrieve the algorithm results */
covariance::ResultPtr result = masterAlgorithm.getResult();
auto t2 = std::chrono::high_resolution_clock::now();
auto duration =
std::chrono::duration_cast<std::chrono::seconds>(t2 - t1).count();
std::cout << "Correlation (native): master step took " << duration << " secs"
<< std::endl;

/* Print the results */
printNumericTable(result->get(covariance::correlation),
"Correlation first 20 columns of "
"correlation matrix:",
1, 20);
static void correlation_compute(JNIEnv *env, jobject obj, int rankId,
ccl::communicator &comm,
const NumericTablePtr &pData, size_t nBlocks,
jobject resultObj) {
using daal::byte;
auto t1 = std::chrono::high_resolution_clock::now();

const bool isRoot = (rankId == ccl_root);

covariance::Distributed<step1Local, algorithmFPType> localAlgorithm;

/* Set the input data set to the algorithm */
localAlgorithm.input.set(covariance::data, pData);

/* Compute covariance */
localAlgorithm.compute();

auto t2 = std::chrono::high_resolution_clock::now();
auto duration =
std::chrono::duration_cast<std::chrono::seconds>(t2 - t1).count();
std::cout << "Correleation (native): local step took " << duration
<< " secs" << std::endl;

t1 = std::chrono::high_resolution_clock::now();

/* Serialize partial results required by step 2 */
services::SharedPtr<byte> serializedData;
InputDataArchive dataArch;
localAlgorithm.getPartialResult()->serialize(dataArch);
size_t perNodeArchLength = dataArch.getSizeOfArchive();

serializedData =
services::SharedPtr<byte>(new byte[perNodeArchLength * nBlocks]);

byte *nodeResults = new byte[perNodeArchLength];
dataArch.copyArchiveToArray(nodeResults, perNodeArchLength);
std::vector<size_t> aReceiveCount(comm.size(),
perNodeArchLength); // 4 x "14016"

/* Transfer partial results to step 2 on the root node */
ccl::gather((int8_t *)nodeResults, perNodeArchLength,
(int8_t *)(serializedData.get()), perNodeArchLength, comm)
.wait();
t2 = std::chrono::high_resolution_clock::now();

duration =
std::chrono::duration_cast<std::chrono::seconds>(t2 - t1).count();
std::cout << "Correleation (native): ccl_allgatherv took " << duration
<< " secs" << std::endl;
if (isRoot) {
auto t1 = std::chrono::high_resolution_clock::now();
/* Create an algorithm to compute covariance on the master node */
covariance::Distributed<step2Master, algorithmFPType> masterAlgorithm;

for (size_t i = 0; i < nBlocks; i++) {
/* Deserialize partial results from step 1 */
OutputDataArchive dataArch(serializedData.get() +
perNodeArchLength * i,
perNodeArchLength);

covariance::PartialResultPtr dataForStep2FromStep1(
new covariance::PartialResult());
dataForStep2FromStep1->deserialize(dataArch);

/* Set local partial results as input for the master-node algorithm
*/
masterAlgorithm.input.add(covariance::partialResults,
dataForStep2FromStep1);
}

/* Set the parameter to choose the type of the output matrix */
masterAlgorithm.parameter.outputMatrixType =
covariance::correlationMatrix;

/* Merge and finalizeCompute covariance decomposition on the master node
*/
masterAlgorithm.compute();
masterAlgorithm.finalizeCompute();

/* Retrieve the algorithm results */
covariance::ResultPtr result = masterAlgorithm.getResult();
auto t2 = std::chrono::high_resolution_clock::now();
auto duration =
std::chrono::duration_cast<std::chrono::seconds>(t2 - t1).count();
std::cout << "Correlation (native): master step took " << duration
<< " secs" << std::endl;

/* Print the results */
printNumericTable(result->get(covariance::correlation),
"Correlation first 20 columns of "
"correlation matrix:",
1, 20);
// Return all covariance & mean
jclass clazz = env->GetObjectClass(resultObj);

// Get Field references
jfieldID correlationNumericTableField =
env->GetFieldID(clazz, "correlationNumericTable", "J");

NumericTablePtr *correlation =
new NumericTablePtr(result->get(covariance::correlation));
// Get Field references
jfieldID correlationNumericTableField =
env->GetFieldID(clazz, "correlationNumericTable", "J");

env->SetLongField(resultObj, correlationNumericTableField, (jlong)correlation);
NumericTablePtr *correlation =
new NumericTablePtr(result->get(covariance::correlation));

}
env->SetLongField(resultObj, correlationNumericTableField,
(jlong)correlation);
}
}

JNIEXPORT jlong JNICALL
Java_com_intel_oap_mllib_stat_CorrelationDALImpl_cCorrelationTrainDAL(
JNIEnv *env, jobject obj, jlong pNumTabData,
jint executor_num, jint executor_cores, jboolean use_gpu, jintArray gpu_idx_array, jobject resultObj) {
JNIEnv *env, jobject obj, jlong pNumTabData, jint executor_num,
jint executor_cores, jboolean use_gpu, jintArray gpu_idx_array,
jobject resultObj) {

ccl::communicator &comm = getComm();
size_t rankId = comm.rank();
Expand All @@ -150,43 +151,42 @@ Java_com_intel_oap_mllib_stat_CorrelationDALImpl_cCorrelationTrainDAL(

NumericTablePtr pData = *((NumericTablePtr *)pNumTabData);

#ifdef CPU_GPU_PROFILE
#ifdef CPU_GPU_PROFILE

if (use_gpu) {
int n_gpu = env->GetArrayLength(gpu_idx_array);
cout << "oneDAL (native): use GPU kernels with " << n_gpu << " GPU(s)"
<< endl;
if (use_gpu) {
int n_gpu = env->GetArrayLength(gpu_idx_array);
cout << "oneDAL (native): use GPU kernels with " << n_gpu << " GPU(s)"
<< endl;

jint *gpu_indices = env->GetIntArrayElements(gpu_idx_array, 0);
jint *gpu_indices = env->GetIntArrayElements(gpu_idx_array, 0);

int size = comm.size();
auto assigned_gpu =
getAssignedGPU(comm, size, rankId, gpu_indices, n_gpu);
int size = comm.size();
auto assigned_gpu =
getAssignedGPU(comm, size, rankId, gpu_indices, n_gpu);

// Set SYCL context
cl::sycl::queue queue(assigned_gpu);
daal::services::SyclExecutionContext ctx(queue);
daal::services::Environment::getInstance()->setDefaultExecutionContext(
ctx);
// Set SYCL context
cl::sycl::queue queue(assigned_gpu);
daal::services::SyclExecutionContext ctx(queue);
daal::services::Environment::getInstance()->setDefaultExecutionContext(
ctx);

correlation_compute(
env, obj, rankId, comm, pData, nBlocks, resultObj);
correlation_compute(env, obj, rankId, comm, pData, nBlocks, resultObj);

env->ReleaseIntArrayElements(gpu_idx_array, gpu_indices, 0);
} else
#endif
{
// Set number of threads for oneDAL to use for each rank
services::Environment::getInstance()->setNumberOfThreads(executor_cores);
env->ReleaseIntArrayElements(gpu_idx_array, gpu_indices, 0);
} else
#endif
{
// Set number of threads for oneDAL to use for each rank
services::Environment::getInstance()->setNumberOfThreads(
executor_cores);

int nThreadsNew =
services::Environment::getInstance()->getNumberOfThreads();
cout << "oneDAL (native): Number of CPU threads used: " << nThreadsNew
<< endl;
int nThreadsNew =
services::Environment::getInstance()->getNumberOfThreads();
cout << "oneDAL (native): Number of CPU threads used: " << nThreadsNew
<< endl;

correlation_compute(
env, obj, rankId, comm, pData, nBlocks, resultObj);
}
correlation_compute(env, obj, rankId, comm, pData, nBlocks, resultObj);
}

return 0;
return 0;
}
12 changes: 7 additions & 5 deletions mllib-dal/src/main/native/KMeansDALImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -266,16 +266,18 @@ Java_com_intel_oap_mllib_clustering_KMeansDALImpl_cKMeansDALComputeWithInitCente
using daal::data_management::internal::convertToSyclHomogen;

Status st;
NumericTablePtr pSyclHomogen = convertToSyclHomogen<algorithmFPType>(*pData, st);
NumericTablePtr pSyclHomogen =
convertToSyclHomogen<algorithmFPType>(*pData, st);
if (!st.ok()) {
std::cout << "Failed to convert row merged table to SYCL homogen one"
<< std::endl;
std::cout
<< "Failed to convert row merged table to SYCL homogen one"
<< std::endl;
return 0L;
}

ret = doKMeansDALComputeWithInitCenters(
env, obj, rankId, comm, pSyclHomogen, centroids, cluster_num, tolerance,
iteration_num, executor_num, resultObj);
env, obj, rankId, comm, pSyclHomogen, centroids, cluster_num,
tolerance, iteration_num, executor_num, resultObj);

env->ReleaseIntArrayElements(gpu_idx_array, gpu_indices, 0);
} else
Expand Down
10 changes: 4 additions & 6 deletions mllib-dal/src/main/native/OneCCL.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,8 @@ JNIEXPORT jint JNICALL Java_com_intel_oap_mllib_OneCCL_00024_c_1init(
return 1;
}

JNIEXPORT void JNICALL Java_com_intel_oap_mllib_OneCCL_00024_c_1cleanup(
JNIEnv *env, jobject obj) {
JNIEXPORT void JNICALL
Java_com_intel_oap_mllib_OneCCL_00024_c_1cleanup(JNIEnv *env, jobject obj) {

g_comms.pop_back();

Expand Down Expand Up @@ -176,10 +176,8 @@ static bool is_valid_ip(char ip[]) {
return false;
}

JNIEXPORT jint JNICALL
Java_com_intel_oap_mllib_OneCCL_00024_c_1getAvailPort(JNIEnv *env,
jobject obj,
jstring localIP) {
JNIEXPORT jint JNICALL Java_com_intel_oap_mllib_OneCCL_00024_c_1getAvailPort(
JNIEnv *env, jobject obj, jstring localIP) {

// start from beginning of dynamic port
const int port_start_base = 3000;
Expand Down
Loading

0 comments on commit d57cc82

Please sign in to comment.