From 082d98772690946ed29c157e60640c97a6e1195b Mon Sep 17 00:00:00 2001 From: larsrc Date: Wed, 13 Jan 2021 06:40:59 -0800 Subject: [PATCH] Implement available() method for Windows subprocesses. RELNOTES: None PiperOrigin-RevId: 351574909 --- .../build/lib/windows/WindowsProcesses.java | 6 ++ .../build/lib/windows/WindowsSubprocess.java | 14 +++++ src/main/native/windows/processes-jni.cc | 58 +++++++++++++++++++ .../lib/windows/WindowsProcessesTest.java | 38 ++++++++++++ .../lib/windows/WindowsSubprocessTest.java | 20 +++++++ 5 files changed, 136 insertions(+) diff --git a/src/main/java/com/google/devtools/build/lib/windows/WindowsProcesses.java b/src/main/java/com/google/devtools/build/lib/windows/WindowsProcesses.java index 531edf5dd0c51f..cf3eb700c34ca3 100644 --- a/src/main/java/com/google/devtools/build/lib/windows/WindowsProcesses.java +++ b/src/main/java/com/google/devtools/build/lib/windows/WindowsProcesses.java @@ -76,6 +76,12 @@ public static long createProcess( /** Returns an opaque identifier of stderr stream for the process. */ public static native long getStderr(long process); + /** + * Returns an estimate of the number of bytes available to read on the stream. Unlike {@link + * InputStream#available()}, this returns 0 on closed or broken streams. + */ + public static native int streamBytesAvailable(long stream); + /** * Reads data from the stream into the given array. {@code stream} should come from {@link * #getStdout(long)} or {@link #getStderr(long)}. diff --git a/src/main/java/com/google/devtools/build/lib/windows/WindowsSubprocess.java b/src/main/java/com/google/devtools/build/lib/windows/WindowsSubprocess.java index 69b8d35b7cc6a3..ae67fd645e23b6 100644 --- a/src/main/java/com/google/devtools/build/lib/windows/WindowsSubprocess.java +++ b/src/main/java/com/google/devtools/build/lib/windows/WindowsSubprocess.java @@ -69,6 +69,20 @@ private static final class ProcessInputStream extends InputStream { this.nativeStream = nativeStream; } + @Override + public int available() throws IOException { + if (nativeStream == WindowsProcesses.INVALID) { + throw new IllegalStateException("Stream already closed"); + } + + int result = WindowsProcesses.streamBytesAvailable(nativeStream); + if (result == -1) { + throw new IOException(WindowsProcesses.streamGetLastError(nativeStream)); + } + + return result; + } + @Override public int read() throws IOException { byte[] buf = new byte[1]; diff --git a/src/main/native/windows/processes-jni.cc b/src/main/native/windows/processes-jni.cc index ee0166a3f3b067..632ce1dca7ac81 100644 --- a/src/main/native/windows/processes-jni.cc +++ b/src/main/native/windows/processes-jni.cc @@ -97,6 +97,33 @@ class NativeOutputStream { void SetHandle(HANDLE handle) { handle_ = handle; } + jint StreamBytesAvailable(JNIEnv* env) { + if (closed_.load() || handle_ == INVALID_HANDLE_VALUE) { + error_ = L""; + return 0; + } + + DWORD avail = 0; + if (!::PeekNamedPipe(handle_, NULL, 0, NULL, &avail, NULL)) { + // Check if either the other end closed the pipe or we did it with + // NativeOutputStream.Close() . In the latter case, we'll get a "system + // call interrupted" error. + if (GetLastError() == ERROR_BROKEN_PIPE || closed_.load()) { + error_ = L""; + return 0; + } else { + DWORD err_code = GetLastError(); + error_ = bazel::windows::MakeErrorMessage(WSTR(__FILE__), __LINE__, + L"nativeStreamBytesAvailable", + L"", err_code); + return -1; + } + } else { + error_ = L""; + } + return avail; + } + jint ReadStream(JNIEnv* env, jbyteArray java_bytes, jint offset, jint length) { JavaByteArray bytes(env, java_bytes); @@ -210,6 +237,7 @@ class NativeProcess { } } + // Set up childs stdin pipe. { HANDLE pipe_read_h, pipe_write_h; if (!CreatePipe(&pipe_read_h, &pipe_write_h, &sa, 0)) { @@ -220,6 +248,14 @@ class NativeProcess { } stdin_process = pipe_read_h; stdin_ = pipe_write_h; + + // "Our" end of the pipe must not be inherited by the child process + if (!SetHandleInformation(pipe_write_h, HANDLE_FLAG_INHERIT, 0)) { + DWORD err_code = GetLastError(); + error_ = bazel::windows::MakeErrorMessage( + WSTR(__FILE__), __LINE__, L"nativeCreateProcess", wpath, err_code); + return false; + } } if (!stdout_is_stream) { @@ -260,6 +296,13 @@ class NativeProcess { } stdout_.SetHandle(pipe_read_h); stdout_process = pipe_write_h; + // "Our" end of the pipe must not be inherited by the child process + if (!SetHandleInformation(pipe_read_h, HANDLE_FLAG_INHERIT, 0)) { + DWORD err_code = GetLastError(); + error_ = bazel::windows::MakeErrorMessage( + WSTR(__FILE__), __LINE__, L"nativeCreateProcess", wpath, err_code); + return false; + } } if (stderr_same_handle_as_stdout) { @@ -314,6 +357,13 @@ class NativeProcess { } stderr_.SetHandle(pipe_read_h); stderr_process = pipe_write_h; + // "Our" end of the pipe must not be inherited by the child process + if (!SetHandleInformation(pipe_read_h, HANDLE_FLAG_INHERIT, 0)) { + DWORD err_code = GetLastError(); + error_ = bazel::windows::MakeErrorMessage( + WSTR(__FILE__), __LINE__, L"nativeCreateProcess", wpath, err_code); + return false; + } } return proc_.Create( wpath, bazel::windows::GetJavaWstring(env, java_argv_rest), @@ -443,6 +493,14 @@ Java_com_google_devtools_build_lib_windows_WindowsProcesses_readStream( return stream->ReadStream(env, java_bytes, offset, length); } +extern "C" JNIEXPORT jint JNICALL +Java_com_google_devtools_build_lib_windows_WindowsProcesses_streamBytesAvailable( + JNIEnv* env, jclass clazz, jlong stream_long) { + NativeOutputStream* stream = + reinterpret_cast(stream_long); + return stream->StreamBytesAvailable(env); +} + extern "C" JNIEXPORT jint JNICALL Java_com_google_devtools_build_lib_windows_WindowsProcesses_getExitCode( JNIEnv* env, jclass clazz, jlong process_long) { diff --git a/src/test/java/com/google/devtools/build/lib/windows/WindowsProcessesTest.java b/src/test/java/com/google/devtools/build/lib/windows/WindowsProcessesTest.java index 20596bc061973e..5030711f2d865b 100644 --- a/src/test/java/com/google/devtools/build/lib/windows/WindowsProcessesTest.java +++ b/src/test/java/com/google/devtools/build/lib/windows/WindowsProcessesTest.java @@ -198,6 +198,44 @@ public void testPartialRead() throws Exception { assertThat(new String(two, UTF_8)).isEqualTo("LLO"); } + @Test + public void testAvailable_givesBytesFromLiveProcess() throws Exception { + process = + WindowsProcesses.createProcess(mockBinary, mockArgs("O-HELLOWRLD"), null, null, null, null); + byte[] one = new byte[2]; + byte[] two = new byte[3]; + + long stdout = WindowsProcesses.getStdout(process); + // Need to wait until the process has posted its data before we can check available() + assertThat(readStdout(one, 0, 2)).isEqualTo(2); + assertNoStreamError(stdout); + assertThat(WindowsProcesses.streamBytesAvailable(stdout)).isEqualTo(7); + assertNoStreamError(stdout); + + assertThat(readStdout(two, 0, 3)).isEqualTo(3); + assertNoStreamError(stdout); + assertThat(WindowsProcesses.streamBytesAvailable(stdout)).isEqualTo(4); + assertNoStreamError(stdout); + + WindowsProcesses.closeStream(stdout); + assertThat(WindowsProcesses.streamBytesAvailable(stdout)).isEqualTo(0); + assertThat(WindowsProcesses.streamGetLastError(stdout)).isEmpty(); + + assertThat(new String(one, UTF_8)).isEqualTo("HE"); + assertThat(new String(two, UTF_8)).isEqualTo("LLO"); + } + + @Test + public void testAvailable_doesNotFailOnDeadProcess() throws Exception { + process = WindowsProcesses.createProcess(mockBinary, mockArgs("X42"), null, null, null, null); + long stdout = WindowsProcesses.getStdout(process); + assertThat(WindowsProcesses.waitFor(process, -1)).isEqualTo(0); + assertThat(WindowsProcesses.getExitCode(process)).isEqualTo(42); + // Windows allows streams to be read after the process has died. + assertThat(WindowsProcesses.streamBytesAvailable(stdout)).isAtLeast(0); + assertThat(WindowsProcesses.streamGetLastError(stdout)).isEmpty(); + } + @Test public void testArrayOutOfBounds() throws Exception { process = diff --git a/src/test/java/com/google/devtools/build/lib/windows/WindowsSubprocessTest.java b/src/test/java/com/google/devtools/build/lib/windows/WindowsSubprocessTest.java index d44a9481e231b6..a305285272ee7e 100644 --- a/src/test/java/com/google/devtools/build/lib/windows/WindowsSubprocessTest.java +++ b/src/test/java/com/google/devtools/build/lib/windows/WindowsSubprocessTest.java @@ -16,6 +16,7 @@ import static com.google.common.truth.Truth.assertThat; import static java.nio.charset.StandardCharsets.UTF_8; +import static org.junit.Assert.assertThrows; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -26,6 +27,7 @@ import com.google.devtools.build.lib.util.OS; import com.google.devtools.build.runfiles.Runfiles; import java.io.File; +import java.io.InputStream; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -125,6 +127,24 @@ public void testSystemDriveIsSet() throws Exception { assertThat(new String(buf, UTF_8).trim()).isEqualTo("X:"); } + @Test + public void testStreamAvailable_zeroAfterClose() throws Exception { + SubprocessBuilder subprocessBuilder = new SubprocessBuilder(WindowsSubprocessFactory.INSTANCE); + subprocessBuilder.setWorkingDirectory(new File(".")); + subprocessBuilder.setArgv(ImmutableList.of(mockBinary, "-jar", mockSubprocess, "OHELLO")); + process = subprocessBuilder.start(); + InputStream inputStream = process.getInputStream(); + // We don't know if the process has already written to the pipe + assertThat(inputStream.available()).isAnyOf(0, 5); + process.waitFor(); + // Windows allows streams to be read after the process has died. + assertThat(inputStream.available()).isAnyOf(0, 5); + inputStream.close(); + assertThrows(IllegalStateException.class, inputStream::available) + .getMessage() + .contains("Stream already closed"); + } + /** * An argument and its command-line-escaped counterpart. *