Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Java]Java hybird test build #45

Merged
merged 8 commits into from
Apr 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions streaming/java/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ define_java_module(
"@maven//:com_google_protobuf_protobuf_java",
"@maven//:commons_io_commons_io",
"@maven//:de_ruedigermoeller_fst",
"@maven//:net_java_dev_jna_jna",
BalaBalaYi marked this conversation as resolved.
Show resolved Hide resolved
"@maven//:org_apache_commons_commons_lang3",
"@maven//:org_apache_logging_log4j_log4j_api",
"@maven//:org_apache_logging_log4j_log4j_core",
Expand Down
1 change: 1 addition & 0 deletions streaming/java/checkstyle-suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
<suppress files=".*" checks="RequireEmptyLineBeforeBlockTagGroup"/>
<suppress files=".*" checks="SingleLineJavadoc"/>
<suppress files=".*" checks="VariableDeclarationUsageDistance"/>
<suppress files=".*" checks="LeftCurly"/>
<!-- suppress check for flatbuffer-generated files. -->
<suppress checks=".*" files="io[\\/]ray[\\/]streaming[\\/]runtime[\\/]generated[\\/]"/>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,16 @@ public Record deserialize(byte[] bytes) {
List list = (List) msgPackSerializer.deserialize(bytes);
Byte typeId = (Byte) list.get(0);
switch (typeId) {
case RECORD_TYPE_ID: {
case RECORD_TYPE_ID:
{
BalaBalaYi marked this conversation as resolved.
Show resolved Hide resolved
String stream = (String) list.get(1);
Object value = list.get(2);
Record record = new Record(value);
record.setStream(stream);
return record;
}
case KEY_RECORD_TYPE_ID: {
case KEY_RECORD_TYPE_ID:
{
String stream = (String) list.get(1);
Object key = list.get(2);
Object value = list.get(3);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ public class TransferHandler {

static {
JniUtils.loadLibrary(BinaryFileUtil.CORE_WORKER_JAVA_LIBRARY, true);
JniUtils.loadLibrary("streaming_java");
io.ray.streaming.runtime.util.JniUtils.loadLibrary("streaming_java");
}

private long writerClientNative;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package io.ray.streaming.runtime.util;

import com.google.common.base.Preconditions;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.RandomAccessFile;
import java.nio.channels.FileLock;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.nio.file.StandardCopyOption;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.SystemUtils;

public class BinaryFileUtil {

public static final String STREAMING_JAVA_LIBRARY = "streaming_java";

/**
* Extract a platform-native resource file to <code>destDir</code>. Note that this a process-safe
* operation. If multi processes extract the file to same directory concurrently, this operation
* will be protected by a file lock.
*
* @param destDir a directory to extract resource file to
* @param fileName resource file name
* @return extracted resource file
*/
public static File getNativeFile(String destDir, String fileName) {
final File dir = new File(destDir);
if (!dir.exists()) {
try {
FileUtils.forceMkdir(dir);
} catch (IOException e) {
throw new RuntimeException("Couldn't make directory: " + dir.getAbsolutePath(), e);
}
}
String lockFilePath = destDir + File.separator + "file_lock";
try (FileLock ignored = new RandomAccessFile(lockFilePath, "rw").getChannel().lock()) {
String resourceDir;
if (SystemUtils.IS_OS_MAC) {
resourceDir = "native/darwin/";
} else if (SystemUtils.IS_OS_LINUX) {
resourceDir = "native/linux/";
} else {
throw new UnsupportedOperationException("Unsupported os " + SystemUtils.OS_NAME);
}
/// File doesn't exist. Create a temp file and then rename it.
final String tempFilePath = String.format("%s/%s.tmp", destDir, fileName);
// Adding a temporary file here is used to fix the issue that when
// a java worker crashes during extracting dynamic library file, next
// java worker will use an incomplete file. The issue link is:
//
// https://github.com/ray-project/ray/issues/19341
File tempFile = new File(tempFilePath);

String resourcePath = resourceDir + fileName;
File destFile = new File(String.format("%s/%s", destDir, fileName));
if (destFile.exists()) {
return destFile;
}

// File does not exist.
try (InputStream is = BinaryFileUtil.class.getResourceAsStream("/" + resourcePath)) {
Preconditions.checkNotNull(is, "{} doesn't exist.", resourcePath);
Files.copy(is, Paths.get(tempFile.getCanonicalPath()), StandardCopyOption.REPLACE_EXISTING);
if (!tempFile.renameTo(destFile)) {
throw new RuntimeException(
String.format(
"Couldn't rename temp file(%s) to %s",
tempFile.getAbsolutePath(), destFile.getAbsolutePath()));
}
return destFile;
} catch (IOException e) {
throw new RuntimeException("Couldn't get temp file from resource " + resourcePath, e);
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public static String getHostName() {

public static void loadNativeLibraries() {
JniUtils.loadLibrary(BinaryFileUtil.CORE_WORKER_JAVA_LIBRARY, true);
JniUtils.loadLibrary("streaming_java");
io.ray.streaming.runtime.util.JniUtils.loadLibrary("streaming_java");
BalaBalaYi marked this conversation as resolved.
Show resolved Hide resolved
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package io.ray.streaming.runtime.util;

import com.google.common.collect.Sets;
import com.sun.jna.NativeLibrary;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JniUtils {
private static final Logger LOGGER = LoggerFactory.getLogger(JniUtils.class);
private static Set<String> loadedLibs = Sets.newHashSet();
private static String defaultDestDir;

/**
* Loads the native library specified by the <code>libraryName</code> argument. The <code>
* libraryName</code> argument must not contain any platform specific prefix, file extension or
* path.
*
* @param libraryName the name of the library.
*/
public static synchronized void loadLibrary(String libraryName) {
loadLibrary(getDefaultDestDir(), libraryName);
}

/**
* Loads the native library specified by the <code>libraryName</code> argument. The <code>
* libraryName</code> argument must not contain any platform specific prefix, file extension or
* path.
*
* @param libraryName the name of the library.
* @param exportSymbols export symbols of library so that it can be used by other libs.
*/
public static synchronized void loadLibrary(String libraryName, boolean exportSymbols) {
loadLibrary(getDefaultDestDir(), libraryName, exportSymbols);
}

/**
* Loads the native library specified by the <code>libraryName</code> argument. The <code>
* libraryName</code> argument must not contain any platform specific prefix, file extension or
* path.
*
* @param destDir The destination dir the library to be extracted.
* @param libraryName the name of the library.
*/
public static synchronized void loadLibrary(String destDir, String libraryName) {
loadLibrary(destDir, libraryName, false);
}

/**
* Loads the native library specified by the <code>libraryName</code> argument. The <code>
* libraryName</code> argument must not contain any platform specific prefix, file extension or
* path.
*
* @param destDir The destination dir the library to be extracted.
* @param libraryName the name of the library.
* @param exportSymbols export symbols of library so that it can be used by other libs.
*/
public static synchronized void loadLibrary(
String destDir, String libraryName, boolean exportSymbols) {
if (!loadedLibs.contains(libraryName)) {
LOGGER.debug("Loading native library {} in {}.", libraryName, destDir);
// Load native library.
String fileName = System.mapLibraryName(libraryName);
final File file = BinaryFileUtil.getNativeFile(destDir, fileName);

if (exportSymbols) {
// Expose library symbols using RTLD_GLOBAL which may be depended by other shared
// libraries.
NativeLibrary.getInstance(file.getAbsolutePath());
}
System.load(file.getAbsolutePath());
LOGGER.debug("Native library loaded.");
loadedLibs.add(libraryName);
}
}

/** Cache the result so that multiple calls return the same dest dir. */
private static synchronized String getDefaultDestDir() {
if (defaultDestDir == null) {
try {
defaultDestDir = Files.createTempDirectory("native_libs").toString();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
return defaultDestDir;
}
}
15 changes: 1 addition & 14 deletions streaming/java/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,8 @@ popd
echo "build ray streaming"
bazel build @com_github_ray_streaming//java:all

# Check that ray libstreaming_java doesn't include symbols from ray by accident.
# Otherwise the symbols may conflict.
symbols_conflict=$(nm bazel-bin/streaming/libstreaming_java.so | grep TaskFinisherInterface || true)
if [ -n "${symbols_conflict}" ]; then
echo "streaming should not include symbols from ray: ${symbols_conflict}"
exit 1
fi

echo "Running streaming tests."
java -cp "$ROOT_DIR"/../../bazel-bin/streaming/java/all_streaming_tests_deploy.jar\
java -cp "$ROOT_DIR"/../bazel-bin/java/all_streaming_tests_deploy.jar\
org.testng.TestNG -d /tmp/ray_streaming_java_test_output "$ROOT_DIR"/testng.xml ||
exit_code=$?
if [ -z ${exit_code+x} ]; then
Expand Down Expand Up @@ -63,11 +55,6 @@ if [ $exit_code -ne 2 ] && [ $exit_code -ne 0 ] ; then
fi

echo "Testing maven install."
cd "$ROOT_DIR"/../../java
echo "build ray maven deps"
bazel build gen_maven_deps
echo "maven install ray"
mvn -Dorg.slf4j.simpleLogger.defaultLogLevel=WARN clean install -DskipTests -Dcheckstyle.skip
cd "$ROOT_DIR"
echo "maven install ray streaming"
mvn -Dorg.slf4j.simpleLogger.defaultLogLevel=WARN clean install -DskipTests -Dcheckstyle.skip
6 changes: 0 additions & 6 deletions streaming/python/raystreaming/tests/test_hybrid_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,6 @@ def sink_func1(x):
print("HybridStreamTest sink_func1 value:", x)


@pytest.mark.skip(
reason=(
"We cannot fetch libstreaming_java.so from ray_dist.jar,"
+ "which will fixed later."
)
)
def test_hybrid_stream():
subprocess.check_call(
[
Expand Down