diff --git a/streaming/java/BUILD.bazel b/streaming/java/BUILD.bazel
index 8190e495..2d59435e 100644
--- a/streaming/java/BUILD.bazel
+++ b/streaming/java/BUILD.bazel
@@ -147,6 +147,7 @@ define_java_module(
"@maven//:com_google_protobuf_protobuf_java",
"@maven//:commons_io_commons_io",
"@maven//:de_ruedigermoeller_fst",
+ "@maven//:net_java_dev_jna_jna",
"@maven//:org_apache_commons_commons_lang3",
"@maven//:org_apache_logging_log4j_log4j_api",
"@maven//:org_apache_logging_log4j_log4j_core",
diff --git a/streaming/java/checkstyle-suppressions.xml b/streaming/java/checkstyle-suppressions.xml
index 8a4fd5b4..62ef9d4f 100644
--- a/streaming/java/checkstyle-suppressions.xml
+++ b/streaming/java/checkstyle-suppressions.xml
@@ -16,6 +16,7 @@
+
diff --git a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/serialization/CrossLangSerializer.java b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/serialization/CrossLangSerializer.java
index ef258a7e..ee987146 100644
--- a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/serialization/CrossLangSerializer.java
+++ b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/serialization/CrossLangSerializer.java
@@ -38,14 +38,16 @@ public Record deserialize(byte[] bytes) {
List list = (List) msgPackSerializer.deserialize(bytes);
Byte typeId = (Byte) list.get(0);
switch (typeId) {
- case RECORD_TYPE_ID: {
+ case RECORD_TYPE_ID:
+ {
String stream = (String) list.get(1);
Object value = list.get(2);
Record record = new Record(value);
record.setStream(stream);
return record;
}
- case KEY_RECORD_TYPE_ID: {
+ case KEY_RECORD_TYPE_ID:
+ {
String stream = (String) list.get(1);
Object key = list.get(2);
Object value = list.get(3);
diff --git a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/transfer/TransferHandler.java b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/transfer/TransferHandler.java
index 816273d8..a3f40516 100644
--- a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/transfer/TransferHandler.java
+++ b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/transfer/TransferHandler.java
@@ -11,7 +11,7 @@ public class TransferHandler {
static {
JniUtils.loadLibrary(BinaryFileUtil.CORE_WORKER_JAVA_LIBRARY, true);
- JniUtils.loadLibrary("streaming_java");
+ io.ray.streaming.runtime.util.JniUtils.loadLibrary("streaming_java");
}
private long writerClientNative;
diff --git a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/util/BinaryFileUtil.java b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/util/BinaryFileUtil.java
new file mode 100644
index 00000000..4717b506
--- /dev/null
+++ b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/util/BinaryFileUtil.java
@@ -0,0 +1,80 @@
+package io.ray.streaming.runtime.util;
+
+import com.google.common.base.Preconditions;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.RandomAccessFile;
+import java.nio.channels.FileLock;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.nio.file.StandardCopyOption;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.SystemUtils;
+
+public class BinaryFileUtil {
+
+ public static final String STREAMING_JAVA_LIBRARY = "streaming_java";
+
+ /**
+ * Extract a platform-native resource file to destDir
. Note that this a process-safe
+ * operation. If multi processes extract the file to same directory concurrently, this operation
+ * will be protected by a file lock.
+ *
+ * @param destDir a directory to extract resource file to
+ * @param fileName resource file name
+ * @return extracted resource file
+ */
+ public static File getNativeFile(String destDir, String fileName) {
+ final File dir = new File(destDir);
+ if (!dir.exists()) {
+ try {
+ FileUtils.forceMkdir(dir);
+ } catch (IOException e) {
+ throw new RuntimeException("Couldn't make directory: " + dir.getAbsolutePath(), e);
+ }
+ }
+ String lockFilePath = destDir + File.separator + "file_lock";
+ try (FileLock ignored = new RandomAccessFile(lockFilePath, "rw").getChannel().lock()) {
+ String resourceDir;
+ if (SystemUtils.IS_OS_MAC) {
+ resourceDir = "native/darwin/";
+ } else if (SystemUtils.IS_OS_LINUX) {
+ resourceDir = "native/linux/";
+ } else {
+ throw new UnsupportedOperationException("Unsupported os " + SystemUtils.OS_NAME);
+ }
+ /// File doesn't exist. Create a temp file and then rename it.
+ final String tempFilePath = String.format("%s/%s.tmp", destDir, fileName);
+ // Adding a temporary file here is used to fix the issue that when
+ // a java worker crashes during extracting dynamic library file, next
+ // java worker will use an incomplete file. The issue link is:
+ //
+ // https://github.com/ray-project/ray/issues/19341
+ File tempFile = new File(tempFilePath);
+
+ String resourcePath = resourceDir + fileName;
+ File destFile = new File(String.format("%s/%s", destDir, fileName));
+ if (destFile.exists()) {
+ return destFile;
+ }
+
+ // File does not exist.
+ try (InputStream is = BinaryFileUtil.class.getResourceAsStream("/" + resourcePath)) {
+ Preconditions.checkNotNull(is, "{} doesn't exist.", resourcePath);
+ Files.copy(is, Paths.get(tempFile.getCanonicalPath()), StandardCopyOption.REPLACE_EXISTING);
+ if (!tempFile.renameTo(destFile)) {
+ throw new RuntimeException(
+ String.format(
+ "Couldn't rename temp file(%s) to %s",
+ tempFile.getAbsolutePath(), destFile.getAbsolutePath()));
+ }
+ return destFile;
+ } catch (IOException e) {
+ throw new RuntimeException("Couldn't get temp file from resource " + resourcePath, e);
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/util/EnvUtil.java b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/util/EnvUtil.java
index 29ac29f4..d097f13e 100644
--- a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/util/EnvUtil.java
+++ b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/util/EnvUtil.java
@@ -30,7 +30,7 @@ public static String getHostName() {
public static void loadNativeLibraries() {
JniUtils.loadLibrary(BinaryFileUtil.CORE_WORKER_JAVA_LIBRARY, true);
- JniUtils.loadLibrary("streaming_java");
+ io.ray.streaming.runtime.util.JniUtils.loadLibrary("streaming_java");
}
/**
diff --git a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/util/JniUtils.java b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/util/JniUtils.java
new file mode 100644
index 00000000..666e3c37
--- /dev/null
+++ b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/util/JniUtils.java
@@ -0,0 +1,91 @@
+package io.ray.streaming.runtime.util;
+
+import com.google.common.collect.Sets;
+import com.sun.jna.NativeLibrary;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.Set;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class JniUtils {
+ private static final Logger LOGGER = LoggerFactory.getLogger(JniUtils.class);
+ private static Set loadedLibs = Sets.newHashSet();
+ private static String defaultDestDir;
+
+ /**
+ * Loads the native library specified by the libraryName
argument. The
+ * libraryName
argument must not contain any platform specific prefix, file extension or
+ * path.
+ *
+ * @param libraryName the name of the library.
+ */
+ public static synchronized void loadLibrary(String libraryName) {
+ loadLibrary(getDefaultDestDir(), libraryName);
+ }
+
+ /**
+ * Loads the native library specified by the libraryName
argument. The
+ * libraryName
argument must not contain any platform specific prefix, file extension or
+ * path.
+ *
+ * @param libraryName the name of the library.
+ * @param exportSymbols export symbols of library so that it can be used by other libs.
+ */
+ public static synchronized void loadLibrary(String libraryName, boolean exportSymbols) {
+ loadLibrary(getDefaultDestDir(), libraryName, exportSymbols);
+ }
+
+ /**
+ * Loads the native library specified by the libraryName
argument. The
+ * libraryName
argument must not contain any platform specific prefix, file extension or
+ * path.
+ *
+ * @param destDir The destination dir the library to be extracted.
+ * @param libraryName the name of the library.
+ */
+ public static synchronized void loadLibrary(String destDir, String libraryName) {
+ loadLibrary(destDir, libraryName, false);
+ }
+
+ /**
+ * Loads the native library specified by the libraryName
argument. The
+ * libraryName
argument must not contain any platform specific prefix, file extension or
+ * path.
+ *
+ * @param destDir The destination dir the library to be extracted.
+ * @param libraryName the name of the library.
+ * @param exportSymbols export symbols of library so that it can be used by other libs.
+ */
+ public static synchronized void loadLibrary(
+ String destDir, String libraryName, boolean exportSymbols) {
+ if (!loadedLibs.contains(libraryName)) {
+ LOGGER.debug("Loading native library {} in {}.", libraryName, destDir);
+ // Load native library.
+ String fileName = System.mapLibraryName(libraryName);
+ final File file = BinaryFileUtil.getNativeFile(destDir, fileName);
+
+ if (exportSymbols) {
+ // Expose library symbols using RTLD_GLOBAL which may be depended by other shared
+ // libraries.
+ NativeLibrary.getInstance(file.getAbsolutePath());
+ }
+ System.load(file.getAbsolutePath());
+ LOGGER.debug("Native library loaded.");
+ loadedLibs.add(libraryName);
+ }
+ }
+
+ /** Cache the result so that multiple calls return the same dest dir. */
+ private static synchronized String getDefaultDestDir() {
+ if (defaultDestDir == null) {
+ try {
+ defaultDestDir = Files.createTempDirectory("native_libs").toString();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ return defaultDestDir;
+ }
+}
diff --git a/streaming/java/test.sh b/streaming/java/test.sh
index 4b557afb..df48ee54 100755
--- a/streaming/java/test.sh
+++ b/streaming/java/test.sh
@@ -18,16 +18,8 @@ popd
echo "build ray streaming"
bazel build @com_github_ray_streaming//java:all
-# Check that ray libstreaming_java doesn't include symbols from ray by accident.
-# Otherwise the symbols may conflict.
-symbols_conflict=$(nm bazel-bin/streaming/libstreaming_java.so | grep TaskFinisherInterface || true)
-if [ -n "${symbols_conflict}" ]; then
- echo "streaming should not include symbols from ray: ${symbols_conflict}"
- exit 1
-fi
-
echo "Running streaming tests."
-java -cp "$ROOT_DIR"/../../bazel-bin/streaming/java/all_streaming_tests_deploy.jar\
+java -cp "$ROOT_DIR"/../bazel-bin/java/all_streaming_tests_deploy.jar\
org.testng.TestNG -d /tmp/ray_streaming_java_test_output "$ROOT_DIR"/testng.xml ||
exit_code=$?
if [ -z ${exit_code+x} ]; then
@@ -63,11 +55,6 @@ if [ $exit_code -ne 2 ] && [ $exit_code -ne 0 ] ; then
fi
echo "Testing maven install."
-cd "$ROOT_DIR"/../../java
-echo "build ray maven deps"
-bazel build gen_maven_deps
-echo "maven install ray"
-mvn -Dorg.slf4j.simpleLogger.defaultLogLevel=WARN clean install -DskipTests -Dcheckstyle.skip
cd "$ROOT_DIR"
echo "maven install ray streaming"
mvn -Dorg.slf4j.simpleLogger.defaultLogLevel=WARN clean install -DskipTests -Dcheckstyle.skip
diff --git a/streaming/python/raystreaming/tests/test_hybrid_stream.py b/streaming/python/raystreaming/tests/test_hybrid_stream.py
index a850b3e7..715e0e66 100644
--- a/streaming/python/raystreaming/tests/test_hybrid_stream.py
+++ b/streaming/python/raystreaming/tests/test_hybrid_stream.py
@@ -22,12 +22,6 @@ def sink_func1(x):
print("HybridStreamTest sink_func1 value:", x)
-@pytest.mark.skip(
- reason=(
- "We cannot fetch libstreaming_java.so from ray_dist.jar,"
- + "which will fixed later."
- )
-)
def test_hybrid_stream():
subprocess.check_call(
[