Skip to content

Commit

Permalink
[Java]Java hybird test build (#45)
Browse files Browse the repository at this point in the history
* fix java test build

* import sys for hybird stream test python

* new a jni extractor jni for libstreaming java so
After this jni util imported, we still fail to run hybird stream test.
From test output logs, boost::asio::detail::epoll_reactor::interrupt
crash thrown outside, which shall not show that rootcasue and problem
details

* append jna maven deps for jni util class

* remove duplicated sys imported

* lease hybird python stream test
  • Loading branch information
ashione authored Apr 19, 2022
1 parent 1f976ee commit 910100d
Show file tree
Hide file tree
Showing 9 changed files with 180 additions and 24 deletions.
1 change: 1 addition & 0 deletions streaming/java/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ define_java_module(
"@maven//:com_google_protobuf_protobuf_java",
"@maven//:commons_io_commons_io",
"@maven//:de_ruedigermoeller_fst",
"@maven//:net_java_dev_jna_jna",
"@maven//:org_apache_commons_commons_lang3",
"@maven//:org_apache_logging_log4j_log4j_api",
"@maven//:org_apache_logging_log4j_log4j_core",
Expand Down
1 change: 1 addition & 0 deletions streaming/java/checkstyle-suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
<suppress files=".*" checks="RequireEmptyLineBeforeBlockTagGroup"/>
<suppress files=".*" checks="SingleLineJavadoc"/>
<suppress files=".*" checks="VariableDeclarationUsageDistance"/>
<suppress files=".*" checks="LeftCurly"/>
<!-- suppress check for flatbuffer-generated files. -->
<suppress checks=".*" files="io[\\/]ray[\\/]streaming[\\/]runtime[\\/]generated[\\/]"/>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,16 @@ public Record deserialize(byte[] bytes) {
List list = (List) msgPackSerializer.deserialize(bytes);
Byte typeId = (Byte) list.get(0);
switch (typeId) {
case RECORD_TYPE_ID: {
case RECORD_TYPE_ID:
{
String stream = (String) list.get(1);
Object value = list.get(2);
Record record = new Record(value);
record.setStream(stream);
return record;
}
case KEY_RECORD_TYPE_ID: {
case KEY_RECORD_TYPE_ID:
{
String stream = (String) list.get(1);
Object key = list.get(2);
Object value = list.get(3);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ public class TransferHandler {

static {
JniUtils.loadLibrary(BinaryFileUtil.CORE_WORKER_JAVA_LIBRARY, true);
JniUtils.loadLibrary("streaming_java");
io.ray.streaming.runtime.util.JniUtils.loadLibrary("streaming_java");
}

private long writerClientNative;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package io.ray.streaming.runtime.util;

import com.google.common.base.Preconditions;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.RandomAccessFile;
import java.nio.channels.FileLock;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.nio.file.StandardCopyOption;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.SystemUtils;

public class BinaryFileUtil {

public static final String STREAMING_JAVA_LIBRARY = "streaming_java";

/**
* Extract a platform-native resource file to <code>destDir</code>. Note that this a process-safe
* operation. If multi processes extract the file to same directory concurrently, this operation
* will be protected by a file lock.
*
* @param destDir a directory to extract resource file to
* @param fileName resource file name
* @return extracted resource file
*/
public static File getNativeFile(String destDir, String fileName) {
final File dir = new File(destDir);
if (!dir.exists()) {
try {
FileUtils.forceMkdir(dir);
} catch (IOException e) {
throw new RuntimeException("Couldn't make directory: " + dir.getAbsolutePath(), e);
}
}
String lockFilePath = destDir + File.separator + "file_lock";
try (FileLock ignored = new RandomAccessFile(lockFilePath, "rw").getChannel().lock()) {
String resourceDir;
if (SystemUtils.IS_OS_MAC) {
resourceDir = "native/darwin/";
} else if (SystemUtils.IS_OS_LINUX) {
resourceDir = "native/linux/";
} else {
throw new UnsupportedOperationException("Unsupported os " + SystemUtils.OS_NAME);
}
/// File doesn't exist. Create a temp file and then rename it.
final String tempFilePath = String.format("%s/%s.tmp", destDir, fileName);
// Adding a temporary file here is used to fix the issue that when
// a java worker crashes during extracting dynamic library file, next
// java worker will use an incomplete file. The issue link is:
//
// https://github.com/ray-project/ray/issues/19341
File tempFile = new File(tempFilePath);

String resourcePath = resourceDir + fileName;
File destFile = new File(String.format("%s/%s", destDir, fileName));
if (destFile.exists()) {
return destFile;
}

// File does not exist.
try (InputStream is = BinaryFileUtil.class.getResourceAsStream("/" + resourcePath)) {
Preconditions.checkNotNull(is, "{} doesn't exist.", resourcePath);
Files.copy(is, Paths.get(tempFile.getCanonicalPath()), StandardCopyOption.REPLACE_EXISTING);
if (!tempFile.renameTo(destFile)) {
throw new RuntimeException(
String.format(
"Couldn't rename temp file(%s) to %s",
tempFile.getAbsolutePath(), destFile.getAbsolutePath()));
}
return destFile;
} catch (IOException e) {
throw new RuntimeException("Couldn't get temp file from resource " + resourcePath, e);
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public static String getHostName() {

public static void loadNativeLibraries() {
JniUtils.loadLibrary(BinaryFileUtil.CORE_WORKER_JAVA_LIBRARY, true);
JniUtils.loadLibrary("streaming_java");
io.ray.streaming.runtime.util.JniUtils.loadLibrary("streaming_java");
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package io.ray.streaming.runtime.util;

import com.google.common.collect.Sets;
import com.sun.jna.NativeLibrary;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JniUtils {
private static final Logger LOGGER = LoggerFactory.getLogger(JniUtils.class);
private static Set<String> loadedLibs = Sets.newHashSet();
private static String defaultDestDir;

/**
* Loads the native library specified by the <code>libraryName</code> argument. The <code>
* libraryName</code> argument must not contain any platform specific prefix, file extension or
* path.
*
* @param libraryName the name of the library.
*/
public static synchronized void loadLibrary(String libraryName) {
loadLibrary(getDefaultDestDir(), libraryName);
}

/**
* Loads the native library specified by the <code>libraryName</code> argument. The <code>
* libraryName</code> argument must not contain any platform specific prefix, file extension or
* path.
*
* @param libraryName the name of the library.
* @param exportSymbols export symbols of library so that it can be used by other libs.
*/
public static synchronized void loadLibrary(String libraryName, boolean exportSymbols) {
loadLibrary(getDefaultDestDir(), libraryName, exportSymbols);
}

/**
* Loads the native library specified by the <code>libraryName</code> argument. The <code>
* libraryName</code> argument must not contain any platform specific prefix, file extension or
* path.
*
* @param destDir The destination dir the library to be extracted.
* @param libraryName the name of the library.
*/
public static synchronized void loadLibrary(String destDir, String libraryName) {
loadLibrary(destDir, libraryName, false);
}

/**
* Loads the native library specified by the <code>libraryName</code> argument. The <code>
* libraryName</code> argument must not contain any platform specific prefix, file extension or
* path.
*
* @param destDir The destination dir the library to be extracted.
* @param libraryName the name of the library.
* @param exportSymbols export symbols of library so that it can be used by other libs.
*/
public static synchronized void loadLibrary(
String destDir, String libraryName, boolean exportSymbols) {
if (!loadedLibs.contains(libraryName)) {
LOGGER.debug("Loading native library {} in {}.", libraryName, destDir);
// Load native library.
String fileName = System.mapLibraryName(libraryName);
final File file = BinaryFileUtil.getNativeFile(destDir, fileName);

if (exportSymbols) {
// Expose library symbols using RTLD_GLOBAL which may be depended by other shared
// libraries.
NativeLibrary.getInstance(file.getAbsolutePath());
}
System.load(file.getAbsolutePath());
LOGGER.debug("Native library loaded.");
loadedLibs.add(libraryName);
}
}

/** Cache the result so that multiple calls return the same dest dir. */
private static synchronized String getDefaultDestDir() {
if (defaultDestDir == null) {
try {
defaultDestDir = Files.createTempDirectory("native_libs").toString();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
return defaultDestDir;
}
}
15 changes: 1 addition & 14 deletions streaming/java/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,8 @@ popd
echo "build ray streaming"
bazel build @com_github_ray_streaming//java:all

# Check that ray libstreaming_java doesn't include symbols from ray by accident.
# Otherwise the symbols may conflict.
symbols_conflict=$(nm bazel-bin/streaming/libstreaming_java.so | grep TaskFinisherInterface || true)
if [ -n "${symbols_conflict}" ]; then
echo "streaming should not include symbols from ray: ${symbols_conflict}"
exit 1
fi

echo "Running streaming tests."
java -cp "$ROOT_DIR"/../../bazel-bin/streaming/java/all_streaming_tests_deploy.jar\
java -cp "$ROOT_DIR"/../bazel-bin/java/all_streaming_tests_deploy.jar\
org.testng.TestNG -d /tmp/ray_streaming_java_test_output "$ROOT_DIR"/testng.xml ||
exit_code=$?
if [ -z ${exit_code+x} ]; then
Expand Down Expand Up @@ -63,11 +55,6 @@ if [ $exit_code -ne 2 ] && [ $exit_code -ne 0 ] ; then
fi

echo "Testing maven install."
cd "$ROOT_DIR"/../../java
echo "build ray maven deps"
bazel build gen_maven_deps
echo "maven install ray"
mvn -Dorg.slf4j.simpleLogger.defaultLogLevel=WARN clean install -DskipTests -Dcheckstyle.skip
cd "$ROOT_DIR"
echo "maven install ray streaming"
mvn -Dorg.slf4j.simpleLogger.defaultLogLevel=WARN clean install -DskipTests -Dcheckstyle.skip
6 changes: 0 additions & 6 deletions streaming/python/raystreaming/tests/test_hybrid_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,6 @@ def sink_func1(x):
print("HybridStreamTest sink_func1 value:", x)


@pytest.mark.skip(
reason=(
"We cannot fetch libstreaming_java.so from ray_dist.jar,"
+ "which will fixed later."
)
)
def test_hybrid_stream():
subprocess.check_call(
[
Expand Down

0 comments on commit 910100d

Please sign in to comment.