From d9481ada895a549eeddb0bb482538b3fa3b019f0 Mon Sep 17 00:00:00 2001 From: Lingxuan Zuo Date: Sat, 2 Apr 2022 12:55:55 +0000 Subject: [PATCH 1/6] fix java test build --- streaming/java/checkstyle-suppressions.xml | 1 + .../serialization/CrossLangSerializer.java | 6 ++++-- streaming/java/test.sh | 15 +-------------- 3 files changed, 6 insertions(+), 16 deletions(-) diff --git a/streaming/java/checkstyle-suppressions.xml b/streaming/java/checkstyle-suppressions.xml index 8a4fd5b4..62ef9d4f 100644 --- a/streaming/java/checkstyle-suppressions.xml +++ b/streaming/java/checkstyle-suppressions.xml @@ -16,6 +16,7 @@ + diff --git a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/serialization/CrossLangSerializer.java b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/serialization/CrossLangSerializer.java index ef258a7e..ee987146 100644 --- a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/serialization/CrossLangSerializer.java +++ b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/serialization/CrossLangSerializer.java @@ -38,14 +38,16 @@ public Record deserialize(byte[] bytes) { List list = (List) msgPackSerializer.deserialize(bytes); Byte typeId = (Byte) list.get(0); switch (typeId) { - case RECORD_TYPE_ID: { + case RECORD_TYPE_ID: + { String stream = (String) list.get(1); Object value = list.get(2); Record record = new Record(value); record.setStream(stream); return record; } - case KEY_RECORD_TYPE_ID: { + case KEY_RECORD_TYPE_ID: + { String stream = (String) list.get(1); Object key = list.get(2); Object value = list.get(3); diff --git a/streaming/java/test.sh b/streaming/java/test.sh index 4b557afb..df48ee54 100755 --- a/streaming/java/test.sh +++ b/streaming/java/test.sh @@ -18,16 +18,8 @@ popd echo "build ray streaming" bazel build @com_github_ray_streaming//java:all -# Check that ray libstreaming_java doesn't include symbols from ray by accident. -# Otherwise the symbols may conflict. -symbols_conflict=$(nm bazel-bin/streaming/libstreaming_java.so | grep TaskFinisherInterface || true) -if [ -n "${symbols_conflict}" ]; then - echo "streaming should not include symbols from ray: ${symbols_conflict}" - exit 1 -fi - echo "Running streaming tests." -java -cp "$ROOT_DIR"/../../bazel-bin/streaming/java/all_streaming_tests_deploy.jar\ +java -cp "$ROOT_DIR"/../bazel-bin/java/all_streaming_tests_deploy.jar\ org.testng.TestNG -d /tmp/ray_streaming_java_test_output "$ROOT_DIR"/testng.xml || exit_code=$? if [ -z ${exit_code+x} ]; then @@ -63,11 +55,6 @@ if [ $exit_code -ne 2 ] && [ $exit_code -ne 0 ] ; then fi echo "Testing maven install." -cd "$ROOT_DIR"/../../java -echo "build ray maven deps" -bazel build gen_maven_deps -echo "maven install ray" -mvn -Dorg.slf4j.simpleLogger.defaultLogLevel=WARN clean install -DskipTests -Dcheckstyle.skip cd "$ROOT_DIR" echo "maven install ray streaming" mvn -Dorg.slf4j.simpleLogger.defaultLogLevel=WARN clean install -DskipTests -Dcheckstyle.skip From b9da324d8704b0fe9d56fa376f9eb31ec1c9be60 Mon Sep 17 00:00:00 2001 From: Lingxuan Zuo Date: Sat, 2 Apr 2022 15:14:57 +0000 Subject: [PATCH 2/6] import sys for hybird stream test python --- streaming/python/raystreaming/tests/test_hybrid_stream.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/streaming/python/raystreaming/tests/test_hybrid_stream.py b/streaming/python/raystreaming/tests/test_hybrid_stream.py index 0cbbfb2d..7bf8270a 100644 --- a/streaming/python/raystreaming/tests/test_hybrid_stream.py +++ b/streaming/python/raystreaming/tests/test_hybrid_stream.py @@ -1,6 +1,7 @@ import os import subprocess +import sys import ray from ray.streaming import StreamingContext from ray._private.test_utils import wait_for_condition @@ -35,7 +36,7 @@ def test_hybrid_stream(): jar_path = os.path.abspath(jar_path) print("jar_path", jar_path) assert not ray.is_initialized() - ray.init(job_config=ray.job_config.JobConfig(code_search_path=[jar_path])) + ray.init(job_config=ray.job_config.JobConfig(code_search_path=sys.path + [jar_path])) sink_file = "/tmp/ray_streaming_test_hybrid_stream.txt" if os.path.exists(sink_file): From 736a9d4f04f9e14edcdb833d629bb7d734216bb7 Mon Sep 17 00:00:00 2001 From: Lingxuan Zuo Date: Sat, 2 Apr 2022 15:20:15 +0000 Subject: [PATCH 3/6] new a jni extractor jni for libstreaming java so After this jni util imported, we still fail to run hybird stream test. From test output logs, boost::asio::detail::epoll_reactor::interrupt crash thrown outside, which shall not show that rootcasue and problem details --- .../runtime/transfer/TransferHandler.java | 2 +- .../runtime/util/BinaryFileUtil.java | 80 ++++++++++++++++ .../ray/streaming/runtime/util/EnvUtil.java | 2 +- .../ray/streaming/runtime/util/JniUtils.java | 91 +++++++++++++++++++ 4 files changed, 173 insertions(+), 2 deletions(-) create mode 100644 streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/util/BinaryFileUtil.java create mode 100644 streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/util/JniUtils.java diff --git a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/transfer/TransferHandler.java b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/transfer/TransferHandler.java index 816273d8..a3f40516 100644 --- a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/transfer/TransferHandler.java +++ b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/transfer/TransferHandler.java @@ -11,7 +11,7 @@ public class TransferHandler { static { JniUtils.loadLibrary(BinaryFileUtil.CORE_WORKER_JAVA_LIBRARY, true); - JniUtils.loadLibrary("streaming_java"); + io.ray.streaming.runtime.util.JniUtils.loadLibrary("streaming_java"); } private long writerClientNative; diff --git a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/util/BinaryFileUtil.java b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/util/BinaryFileUtil.java new file mode 100644 index 00000000..4717b506 --- /dev/null +++ b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/util/BinaryFileUtil.java @@ -0,0 +1,80 @@ +package io.ray.streaming.runtime.util; + +import com.google.common.base.Preconditions; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.RandomAccessFile; +import java.nio.channels.FileLock; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.nio.file.StandardCopyOption; +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang3.SystemUtils; + +public class BinaryFileUtil { + + public static final String STREAMING_JAVA_LIBRARY = "streaming_java"; + + /** + * Extract a platform-native resource file to destDir. Note that this a process-safe + * operation. If multi processes extract the file to same directory concurrently, this operation + * will be protected by a file lock. + * + * @param destDir a directory to extract resource file to + * @param fileName resource file name + * @return extracted resource file + */ + public static File getNativeFile(String destDir, String fileName) { + final File dir = new File(destDir); + if (!dir.exists()) { + try { + FileUtils.forceMkdir(dir); + } catch (IOException e) { + throw new RuntimeException("Couldn't make directory: " + dir.getAbsolutePath(), e); + } + } + String lockFilePath = destDir + File.separator + "file_lock"; + try (FileLock ignored = new RandomAccessFile(lockFilePath, "rw").getChannel().lock()) { + String resourceDir; + if (SystemUtils.IS_OS_MAC) { + resourceDir = "native/darwin/"; + } else if (SystemUtils.IS_OS_LINUX) { + resourceDir = "native/linux/"; + } else { + throw new UnsupportedOperationException("Unsupported os " + SystemUtils.OS_NAME); + } + /// File doesn't exist. Create a temp file and then rename it. + final String tempFilePath = String.format("%s/%s.tmp", destDir, fileName); + // Adding a temporary file here is used to fix the issue that when + // a java worker crashes during extracting dynamic library file, next + // java worker will use an incomplete file. The issue link is: + // + // https://github.com/ray-project/ray/issues/19341 + File tempFile = new File(tempFilePath); + + String resourcePath = resourceDir + fileName; + File destFile = new File(String.format("%s/%s", destDir, fileName)); + if (destFile.exists()) { + return destFile; + } + + // File does not exist. + try (InputStream is = BinaryFileUtil.class.getResourceAsStream("/" + resourcePath)) { + Preconditions.checkNotNull(is, "{} doesn't exist.", resourcePath); + Files.copy(is, Paths.get(tempFile.getCanonicalPath()), StandardCopyOption.REPLACE_EXISTING); + if (!tempFile.renameTo(destFile)) { + throw new RuntimeException( + String.format( + "Couldn't rename temp file(%s) to %s", + tempFile.getAbsolutePath(), destFile.getAbsolutePath())); + } + return destFile; + } catch (IOException e) { + throw new RuntimeException("Couldn't get temp file from resource " + resourcePath, e); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + } +} diff --git a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/util/EnvUtil.java b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/util/EnvUtil.java index 29ac29f4..d097f13e 100644 --- a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/util/EnvUtil.java +++ b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/util/EnvUtil.java @@ -30,7 +30,7 @@ public static String getHostName() { public static void loadNativeLibraries() { JniUtils.loadLibrary(BinaryFileUtil.CORE_WORKER_JAVA_LIBRARY, true); - JniUtils.loadLibrary("streaming_java"); + io.ray.streaming.runtime.util.JniUtils.loadLibrary("streaming_java"); } /** diff --git a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/util/JniUtils.java b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/util/JniUtils.java new file mode 100644 index 00000000..666e3c37 --- /dev/null +++ b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/util/JniUtils.java @@ -0,0 +1,91 @@ +package io.ray.streaming.runtime.util; + +import com.google.common.collect.Sets; +import com.sun.jna.NativeLibrary; +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.util.Set; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class JniUtils { + private static final Logger LOGGER = LoggerFactory.getLogger(JniUtils.class); + private static Set loadedLibs = Sets.newHashSet(); + private static String defaultDestDir; + + /** + * Loads the native library specified by the libraryName argument. The + * libraryName argument must not contain any platform specific prefix, file extension or + * path. + * + * @param libraryName the name of the library. + */ + public static synchronized void loadLibrary(String libraryName) { + loadLibrary(getDefaultDestDir(), libraryName); + } + + /** + * Loads the native library specified by the libraryName argument. The + * libraryName argument must not contain any platform specific prefix, file extension or + * path. + * + * @param libraryName the name of the library. + * @param exportSymbols export symbols of library so that it can be used by other libs. + */ + public static synchronized void loadLibrary(String libraryName, boolean exportSymbols) { + loadLibrary(getDefaultDestDir(), libraryName, exportSymbols); + } + + /** + * Loads the native library specified by the libraryName argument. The + * libraryName argument must not contain any platform specific prefix, file extension or + * path. + * + * @param destDir The destination dir the library to be extracted. + * @param libraryName the name of the library. + */ + public static synchronized void loadLibrary(String destDir, String libraryName) { + loadLibrary(destDir, libraryName, false); + } + + /** + * Loads the native library specified by the libraryName argument. The + * libraryName argument must not contain any platform specific prefix, file extension or + * path. + * + * @param destDir The destination dir the library to be extracted. + * @param libraryName the name of the library. + * @param exportSymbols export symbols of library so that it can be used by other libs. + */ + public static synchronized void loadLibrary( + String destDir, String libraryName, boolean exportSymbols) { + if (!loadedLibs.contains(libraryName)) { + LOGGER.debug("Loading native library {} in {}.", libraryName, destDir); + // Load native library. + String fileName = System.mapLibraryName(libraryName); + final File file = BinaryFileUtil.getNativeFile(destDir, fileName); + + if (exportSymbols) { + // Expose library symbols using RTLD_GLOBAL which may be depended by other shared + // libraries. + NativeLibrary.getInstance(file.getAbsolutePath()); + } + System.load(file.getAbsolutePath()); + LOGGER.debug("Native library loaded."); + loadedLibs.add(libraryName); + } + } + + /** Cache the result so that multiple calls return the same dest dir. */ + private static synchronized String getDefaultDestDir() { + if (defaultDestDir == null) { + try { + defaultDestDir = Files.createTempDirectory("native_libs").toString(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + return defaultDestDir; + } +} From f95aa0da0a52b917920a71c514e5ce2c65c562e7 Mon Sep 17 00:00:00 2001 From: Lingxuan Zuo Date: Sat, 2 Apr 2022 15:28:14 +0000 Subject: [PATCH 4/6] append jna maven deps for jni util class --- streaming/java/BUILD.bazel | 1 + 1 file changed, 1 insertion(+) diff --git a/streaming/java/BUILD.bazel b/streaming/java/BUILD.bazel index 8190e495..2d59435e 100644 --- a/streaming/java/BUILD.bazel +++ b/streaming/java/BUILD.bazel @@ -147,6 +147,7 @@ define_java_module( "@maven//:com_google_protobuf_protobuf_java", "@maven//:commons_io_commons_io", "@maven//:de_ruedigermoeller_fst", + "@maven//:net_java_dev_jna_jna", "@maven//:org_apache_commons_commons_lang3", "@maven//:org_apache_logging_log4j_log4j_api", "@maven//:org_apache_logging_log4j_log4j_core", From e74675241be0ebe379759e1069a5f90193a5cca6 Mon Sep 17 00:00:00 2001 From: Lingxuan Zuo Date: Thu, 7 Apr 2022 13:30:33 +0000 Subject: [PATCH 5/6] remove duplicated sys imported --- streaming/python/raystreaming/tests/test_hybrid_stream.py | 1 - 1 file changed, 1 deletion(-) diff --git a/streaming/python/raystreaming/tests/test_hybrid_stream.py b/streaming/python/raystreaming/tests/test_hybrid_stream.py index 51090c13..a850b3e7 100644 --- a/streaming/python/raystreaming/tests/test_hybrid_stream.py +++ b/streaming/python/raystreaming/tests/test_hybrid_stream.py @@ -2,7 +2,6 @@ import subprocess import sys -import sys import ray from ray.streaming import StreamingContext from ray._private.test_utils import wait_for_condition From 699fd3b359e419c60b6143bd1c80e7a6a00e6220 Mon Sep 17 00:00:00 2001 From: Lingxuan Zuo Date: Tue, 19 Apr 2022 02:29:16 +0000 Subject: [PATCH 6/6] lease hybird python stream test --- streaming/python/raystreaming/tests/test_hybrid_stream.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/streaming/python/raystreaming/tests/test_hybrid_stream.py b/streaming/python/raystreaming/tests/test_hybrid_stream.py index a850b3e7..715e0e66 100644 --- a/streaming/python/raystreaming/tests/test_hybrid_stream.py +++ b/streaming/python/raystreaming/tests/test_hybrid_stream.py @@ -22,12 +22,6 @@ def sink_func1(x): print("HybridStreamTest sink_func1 value:", x) -@pytest.mark.skip( - reason=( - "We cannot fetch libstreaming_java.so from ray_dist.jar," - + "which will fixed later." - ) -) def test_hybrid_stream(): subprocess.check_call( [